Complete Guide with Real-World Examples
You're deep into your retail data project, analyzing transactions for the month.
Suddenly, your manager drops a question that sounds simple β but it's a bit of a thinker:
"Can you check if there were any days where we made no sales at all?"
This isn't just about curiosity. A missing day could mean:
Let's say you work for a chain of electronics stores. Each store logs its transactions daily. You're reviewing the national sales file.
You notice:
You dive deeper and realize there are gaps in the sales log. Time to act.
Let's solve this in PySpark, the go-to tool for handling large-scale data processing in modern data engineering pipelines.
We'll solve this using:
F.date_add
, F.sequence
, F.explode
, F.col
# Step 1: Import required libraries from pyspark.sql import SparkSession from pyspark.sql.functions import col, min, max, sequence, explode # Step 2: Start Spark session spark = SparkSession.builder.appName("MissingDates").getOrCreate() # Step 3: Create sample sales data data = [ ("2024-01-01",), ("2024-01-02",), ("2024-01-04",), ("2024-01-06",), ] df_sales = spark.createDataFrame(data, ["SaleDate"]) df_sales = df_sales.withColumn("SaleDate", col("SaleDate").cast("date")) # β Step 4: Print the DataFrame df_sales.show() # Console print display(df_sales) # Notebook-style table
SaleDate |
---|
2024-01-01 |
2024-01-02 |
2024-01-04 |
2024-01-06 |
from pyspark.sql.functions import min, max # Get min and max SaleDate to define the range date_range = df_sales.agg( min("SaleDate").alias("StartDate"), max("SaleDate").alias("EndDate") ).collect()[0] start_date = date_range["StartDate"] end_date = date_range["EndDate"]
df_all_dates = spark.createDataFrame([(start_date, end_date)], ["StartDate", "EndDate"]) \ .withColumn("AllDates", sequence(col("StartDate"), col("EndDate"))) \ .withColumn("MissingDate", explode(col("AllDates"))) \ .select("MissingDate")
sequence()
function fills in the blanks; explode()
turns them into individual rows.
df_missing = df_all_dates.join( df_sales, df_all_dates.MissingDate == df_sales.SaleDate, how="left_anti" )
β¨ This is where the magic happens:
We keep only the dates that are in the calendar but not in the sales log.
Think of it like this: "Show me the days we should have had sales, but didn't."
df_missing.orderBy("MissingDate").show()
MissingDate |
---|
2024-01-03 |
2024-01-05 |
from pyspark.sql import SparkSession from pyspark.sql.functions import col, min, max, sequence, explode # Step 1: Start Spark session spark = SparkSession.builder.appName("MissingDates").getOrCreate() # Step 2: Create sample sales data data = [ ("2024-01-01",), ("2024-01-02",), ("2024-01-04",), ("2024-01-06",), ] df_sales = spark.createDataFrame(data, ["SaleDate"]) df_sales = df_sales.withColumn("SaleDate", col("SaleDate").cast("date")) # Step 3: Get min and max date date_range = df_sales.agg( min("SaleDate").alias("StartDate"), max("SaleDate").alias("EndDate") ).collect()[0] start_date = date_range["StartDate"] end_date = date_range["EndDate"] # Step 4: Generate full date range using sequence df_all_dates = spark.createDataFrame([(start_date, end_date)], ["StartDate", "EndDate"]) \ .withColumn("AllDates", sequence(col("StartDate"), col("EndDate"))) \ .withColumn("MissingDate", explode(col("AllDates"))) \ .select("MissingDate") # Step 5: Perform anti join to find missing dates df_missing = df_all_dates.join( df_sales, df_all_dates.MissingDate == df_sales.SaleDate, how="left_anti" ) # Step 6: Show result df_missing.orderBy("MissingDate").show()
from pyspark.sql import SparkSession from pyspark.sql.functions import col, lit, expr from pyspark.sql.types import DateType import datetime # Start Spark session spark = SparkSession.builder.appName("MissingDates").getOrCreate() # Step 1: Create input DataFrame data = [("2024-01-01",), ("2024-01-02",), ("2024-01-04",), ("2024-01-06",)] df = spark.createDataFrame(data, ["SaleDate"]).withColumn("SaleDate", col("SaleDate").cast(DateType())) # Step 2: Get min and max dates min_date = df.agg({"SaleDate": "min"}).collect()[0][0] max_date = df.agg({"SaleDate": "max"}).collect()[0][0] # Step 3: Generate list of all dates from min to max def generate_date_range(start, end): days = (end - start).days + 1 return [(start + datetime.timedelta(days=i),) for i in range(days)] all_dates = generate_date_range(min_date, max_date) all_dates_df = spark.createDataFrame(all_dates, ["MissingDate"]) # Step 4: Find missing dates using anti join missing_dates_df = all_dates_df.join(df.withColumnRenamed("SaleDate", "MissingDate"), on="MissingDate", how="left_anti") # Step 5: Show result missing_dates_df.orderBy("MissingDate").show()
πΌ This pattern is useful across industries:
π’ Industry | π‘ Use Case Example |
---|---|
π¦ Banking | Check if interest wasn't posted for any date |
π E-commerce | Identify zero-order days during campaigns |
π‘ IoT Logs | Spot if sensors stopped transmitting |
π₯ Attendance | Detect if data upload skipped a day |
π― Task | ποΈ SQL Approach | β‘ PySpark Equivalent |
---|---|---|
π Date Range | MIN(SaleDate), MAX(SaleDate) |
agg(min, max) |
π Date Calendar | Tally table + DATEADD() |
sequence() + explode() |
π Find Missing | WHERE NOT IN or LEFT JOIN NULL |
left_anti join |
π‘ Now that you've mastered finding missing dates, you can apply this pattern to: