Slowly Changing Dimensions (SCD Type 2) is one of those patterns that sounds complicated until you break it down. You have a record. It changes. You want to keep both versions the old and the new with timestamps marking when each was valid.
Most tutorials implement this in SQL or PySpark. But if you're running a medallion pipeline on a single Azure VM or in a Fabric notebook, you don't always need the overhead. Polars handles this cleanly.
The Core Logic
SCD Type 2 at its heart is three operations:
- Identify changed records compare incoming rows against the current silver layer
- Expire old rows set
valid_toon records that have changed - Insert new rows append the new versions with
valid_from = today,valid_to = null
Unchanged records carry forward untouched. New records (no existing match) are inserted fresh.
The Implementation
import polars as pl
from datetime import date
def apply_scd2(
existing: pl.DataFrame,
incoming: pl.DataFrame,
key_cols: list[str],
tracked_cols: list[str],
valid_from_col: str = "valid_from",
valid_to_col: str = "valid_to",
is_current_col: str = "is_current",
) -> pl.DataFrame:
"""
Apply SCD Type 2 logic.
existing - current silver table (may contain expired rows)
incoming - new snapshot of source data
key_cols - business key columns (e.g. ["member_id"])
tracked_cols - columns that trigger a new version when changed
"""
today = date.today()
# Separate current from already-expired rows
current = existing.filter(pl.col(is_current_col) == True)
historical = existing.filter(pl.col(is_current_col) == False)
# Join current state against incoming on business key
joined = current.join(
incoming.select(key_cols + tracked_cols),
on=key_cols,
how="outer",
suffix="_new"
)
# Detect changes - any tracked column differs
change_expr = pl.lit(False)
for col in tracked_cols:
change_expr = change_expr | (pl.col(col) != pl.col(f"{col}_new"))
changed = joined.filter(change_expr)
unchanged = joined.filter(~change_expr).drop(
[f"{col}_new" for col in tracked_cols]
)
# Expire changed rows
expired = changed.select(
key_cols + tracked_cols + [valid_from_col]
).with_columns([
pl.lit(today).alias(valid_to_col),
pl.lit(False).alias(is_current_col),
])
# Build new versions from incoming data
new_versions = changed.select(
key_cols + [f"{col}_new" for col in tracked_cols]
).rename({f"{col}_new": col for col in tracked_cols}).with_columns([
pl.lit(today).alias(valid_from_col),
pl.lit(None).cast(pl.Date).alias(valid_to_col),
pl.lit(True).alias(is_current_col),
])
# Detect genuinely new records (no existing current match)
existing_keys = current.select(key_cols)
brand_new = incoming.join(existing_keys, on=key_cols, how="anti").with_columns([
pl.lit(today).alias(valid_from_col),
pl.lit(None).cast(pl.Date).alias(valid_to_col),
pl.lit(True).alias(is_current_col),
])
return pl.concat([historical, unchanged, expired, new_versions, brand_new])
Calling It
silver = pl.read_parquet("silver/members/")
incoming = pl.read_parquet("bronze/members/latest/")
result = apply_scd2(
existing=silver,
incoming=incoming,
key_cols=["member_id"],
tracked_cols=["plan_code", "coverage_amount", "provider_id"],
)
result.write_parquet("silver/members/", use_pyarrow=True)
What to Watch
Composite keys: pass multiple columns in key_cols the join handles it naturally.
Null handling: Polars treats null != null as null (not True), which means unchanged null columns won't incorrectly trigger a new version. Test this in your environment.
Deduplication at bronze: this function assumes incoming has exactly one row per key. If your bronze layer can have duplicates (late-arriving records, duplicated feeds), deduplicate before calling this.
Write strategy: if you're on Delta Lake, use MERGE INTO instead the ACID guarantees are worth it at scale. This pure-Polars approach is best for Parquet-based pipelines where you control the write lock.
Why Not SQL?
You could do all of this in a SQL MERGE statement. But if your pipeline is already Python-native reading from APIs, applying business logic, running validations keeping the SCD logic in the same layer avoids a round-trip to a SQL engine and keeps the transformation testable with pytest.
One test suite, one language, one pipeline.