Complete Guide with Real-World Examples
You're deep into your retail data project, analyzing transactions for the month.
Suddenly, your manager drops a question that sounds simple β but it's a bit of a thinker:
"Can you check if there were any days where we made no sales at all?"
This isn't just about curiosity. A missing day could mean:
Let's say you work for a chain of electronics stores. Each store logs its transactions daily. You're reviewing the national sales file.
You notice:
You dive deeper and realize there are gaps in the sales log. Time to act.
Let's solve this in PySpark, the go-to tool for handling large-scale data processing in modern data engineering pipelines.
We'll solve this using:
F.date_add, F.sequence, F.explode, F.col# Step 1: Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, max, sequence, explode
# Step 2: Start Spark session
spark = SparkSession.builder.appName("MissingDates").getOrCreate()
# Step 3: Create sample sales data
data = [
("2024-01-01",),
("2024-01-02",),
("2024-01-04",),
("2024-01-06",),
]
df_sales = spark.createDataFrame(data, ["SaleDate"])
df_sales = df_sales.withColumn("SaleDate", col("SaleDate").cast("date"))
# β
Step 4: Print the DataFrame
df_sales.show() # Console print
display(df_sales) # Notebook-style table
| SaleDate |
|---|
| 2024-01-01 |
| 2024-01-02 |
| 2024-01-04 |
| 2024-01-06 |
from pyspark.sql.functions import min, max
# Get min and max SaleDate to define the range
date_range = df_sales.agg(
min("SaleDate").alias("StartDate"),
max("SaleDate").alias("EndDate")
).collect()[0]
start_date = date_range["StartDate"]
end_date = date_range["EndDate"]
df_all_dates = spark.createDataFrame([(start_date, end_date)], ["StartDate", "EndDate"]) \
.withColumn("AllDates", sequence(col("StartDate"), col("EndDate"))) \
.withColumn("MissingDate", explode(col("AllDates"))) \
.select("MissingDate")
sequence() function fills in the blanks; explode() turns them into individual rows.
df_missing = df_all_dates.join(
df_sales,
df_all_dates.MissingDate == df_sales.SaleDate,
how="left_anti"
)
β¨ This is where the magic happens:
We keep only the dates that are in the calendar but not in the sales log.
Think of it like this: "Show me the days we should have had sales, but didn't."
df_missing.orderBy("MissingDate").show()
| MissingDate |
|---|
| 2024-01-03 |
| 2024-01-05 |
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, max, sequence, explode
# Step 1: Start Spark session
spark = SparkSession.builder.appName("MissingDates").getOrCreate()
# Step 2: Create sample sales data
data = [
("2024-01-01",),
("2024-01-02",),
("2024-01-04",),
("2024-01-06",),
]
df_sales = spark.createDataFrame(data, ["SaleDate"])
df_sales = df_sales.withColumn("SaleDate", col("SaleDate").cast("date"))
# Step 3: Get min and max date
date_range = df_sales.agg(
min("SaleDate").alias("StartDate"),
max("SaleDate").alias("EndDate")
).collect()[0]
start_date = date_range["StartDate"]
end_date = date_range["EndDate"]
# Step 4: Generate full date range using sequence
df_all_dates = spark.createDataFrame([(start_date, end_date)], ["StartDate", "EndDate"]) \
.withColumn("AllDates", sequence(col("StartDate"), col("EndDate"))) \
.withColumn("MissingDate", explode(col("AllDates"))) \
.select("MissingDate")
# Step 5: Perform anti join to find missing dates
df_missing = df_all_dates.join(
df_sales,
df_all_dates.MissingDate == df_sales.SaleDate,
how="left_anti"
)
# Step 6: Show result
df_missing.orderBy("MissingDate").show()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, expr
from pyspark.sql.types import DateType
import datetime
# Start Spark session
spark = SparkSession.builder.appName("MissingDates").getOrCreate()
# Step 1: Create input DataFrame
data = [("2024-01-01",), ("2024-01-02",), ("2024-01-04",), ("2024-01-06",)]
df = spark.createDataFrame(data, ["SaleDate"]).withColumn("SaleDate", col("SaleDate").cast(DateType()))
# Step 2: Get min and max dates
min_date = df.agg({"SaleDate": "min"}).collect()[0][0]
max_date = df.agg({"SaleDate": "max"}).collect()[0][0]
# Step 3: Generate list of all dates from min to max
def generate_date_range(start, end):
days = (end - start).days + 1
return [(start + datetime.timedelta(days=i),) for i in range(days)]
all_dates = generate_date_range(min_date, max_date)
all_dates_df = spark.createDataFrame(all_dates, ["MissingDate"])
# Step 4: Find missing dates using anti join
missing_dates_df = all_dates_df.join(df.withColumnRenamed("SaleDate", "MissingDate"), on="MissingDate", how="left_anti")
# Step 5: Show result
missing_dates_df.orderBy("MissingDate").show()
πΌ This pattern is useful across industries:
| π’ Industry | π‘ Use Case Example |
|---|---|
| π¦ Banking | Check if interest wasn't posted for any date |
| π E-commerce | Identify zero-order days during campaigns |
| π‘ IoT Logs | Spot if sensors stopped transmitting |
| π₯ Attendance | Detect if data upload skipped a day |
| π― Task | ποΈ SQL Approach | β‘ PySpark Equivalent |
|---|---|---|
| π Date Range | MIN(SaleDate), MAX(SaleDate) |
agg(min, max) |
| π Date Calendar | Tally table + DATEADD() |
sequence() + explode() |
| π Find Missing | WHERE NOT IN or LEFT JOIN NULL |
left_anti join |
π‘ Now that you've mastered finding missing dates, you can apply this pattern to: