Implementing SCD Type 2 in Pure Polars

Slowly changing dimensions are a solved problem but most implementations lean on Spark or SQL. Here's how to do it cleanly in Polars with about 60 lines of Python.

Slowly Changing Dimensions (SCD Type 2) is one of those patterns that sounds complicated until you break it down. You have a record. It changes. You want to keep both versions the old and the new with timestamps marking when each was valid.

Most tutorials implement this in SQL or PySpark. But if you're running a medallion pipeline on a single Azure VM or in a Fabric notebook, you don't always need the overhead. Polars handles this cleanly.

The Core Logic

SCD Type 2 at its heart is three operations:

  1. Identify changed records compare incoming rows against the current silver layer
  2. Expire old rows set valid_to on records that have changed
  3. Insert new rows append the new versions with valid_from = today, valid_to = null

Unchanged records carry forward untouched. New records (no existing match) are inserted fresh.

The Implementation

import polars as pl
from datetime import date

def apply_scd2(
    existing: pl.DataFrame,
    incoming: pl.DataFrame,
    key_cols: list[str],
    tracked_cols: list[str],
    valid_from_col: str = "valid_from",
    valid_to_col: str = "valid_to",
    is_current_col: str = "is_current",
) -> pl.DataFrame:
    """
    Apply SCD Type 2 logic.

    existing  - current silver table (may contain expired rows)
    incoming  - new snapshot of source data
    key_cols  - business key columns (e.g. ["member_id"])
    tracked_cols - columns that trigger a new version when changed
    """
    today = date.today()

    # Separate current from already-expired rows
    current = existing.filter(pl.col(is_current_col) == True)
    historical = existing.filter(pl.col(is_current_col) == False)

    # Join current state against incoming on business key
    joined = current.join(
        incoming.select(key_cols + tracked_cols),
        on=key_cols,
        how="outer",
        suffix="_new"
    )

    # Detect changes - any tracked column differs
    change_expr = pl.lit(False)
    for col in tracked_cols:
        change_expr = change_expr | (pl.col(col) != pl.col(f"{col}_new"))

    changed = joined.filter(change_expr)
    unchanged = joined.filter(~change_expr).drop(
        [f"{col}_new" for col in tracked_cols]
    )

    # Expire changed rows
    expired = changed.select(
        key_cols + tracked_cols + [valid_from_col]
    ).with_columns([
        pl.lit(today).alias(valid_to_col),
        pl.lit(False).alias(is_current_col),
    ])

    # Build new versions from incoming data
    new_versions = changed.select(
        key_cols + [f"{col}_new" for col in tracked_cols]
    ).rename({f"{col}_new": col for col in tracked_cols}).with_columns([
        pl.lit(today).alias(valid_from_col),
        pl.lit(None).cast(pl.Date).alias(valid_to_col),
        pl.lit(True).alias(is_current_col),
    ])

    # Detect genuinely new records (no existing current match)
    existing_keys = current.select(key_cols)
    brand_new = incoming.join(existing_keys, on=key_cols, how="anti").with_columns([
        pl.lit(today).alias(valid_from_col),
        pl.lit(None).cast(pl.Date).alias(valid_to_col),
        pl.lit(True).alias(is_current_col),
    ])

    return pl.concat([historical, unchanged, expired, new_versions, brand_new])

Calling It

silver = pl.read_parquet("silver/members/")
incoming = pl.read_parquet("bronze/members/latest/")

result = apply_scd2(
    existing=silver,
    incoming=incoming,
    key_cols=["member_id"],
    tracked_cols=["plan_code", "coverage_amount", "provider_id"],
)

result.write_parquet("silver/members/", use_pyarrow=True)

What to Watch

Composite keys: pass multiple columns in key_cols the join handles it naturally.

Null handling: Polars treats null != null as null (not True), which means unchanged null columns won't incorrectly trigger a new version. Test this in your environment.

Deduplication at bronze: this function assumes incoming has exactly one row per key. If your bronze layer can have duplicates (late-arriving records, duplicated feeds), deduplicate before calling this.

Write strategy: if you're on Delta Lake, use MERGE INTO instead the ACID guarantees are worth it at scale. This pure-Polars approach is best for Parquet-based pipelines where you control the write lock.

Why Not SQL?

You could do all of this in a SQL MERGE statement. But if your pipeline is already Python-native reading from APIs, applying business logic, running validations keeping the SCD logic in the same layer avoids a round-trip to a SQL engine and keeps the transformation testable with pytest.

One test suite, one language, one pipeline.