Reliable Data Pipelines

8 minute read|Published October 2025

Overview

Data pipelines move information from source systems into warehouses and analytics platforms. They extract data from databases, APIs, and event streams, transform it into usable formats, and load it into destinations where analysts and other systems consume it.

The engineering challenge is not building a pipeline that works. It is building a pipeline that fails visibly, recovers gracefully, and does not silently produce wrong data. Most pipeline failures are not crashes - they are silent data quality degradations that go undetected for days or weeks until someone notices the numbers do not add up.

Reliable data pipelines treat data quality as a first-class operational concern, not a downstream analytics problem.

Problems

Architecture

Loading diagram...
Data pipeline architecture with quality gates between stages. Each gate can halt the pipeline before bad data propagates.

The architecture inserts quality gates between pipeline stages. Each gate runs validation checks and can halt the pipeline before bad data propagates downstream. Failed records are routed to a quarantine zone for inspection rather than being silently dropped or corrupting the output.

The ingestion layer handles extraction from source systems and writes raw data to a staging area. Transformation runs on staged data. Each layer is independently retryable - a transformation failure does not re-extract source data.

Key Engineering Challenges

Data Quality Gates

Quality gates run automated checks between pipeline stages. They validate schema conformance, row counts, null rates, value distributions, and metric drift.

class QualityGate:
    def __init__(self, checks):
        self.checks = checks

    def validate(self, dataset):
        results = []
        for check in self.checks:
            result = check.run(dataset)
            results.append(result)

        failures = [r for r in results if not r.passed]

        if any(f.severity == "critical" for f in failures):
            self.quarantine(dataset, failures)
            raise QualityGateFailure(failures)

        if failures:
            self.alert(failures)

        return results


# Example checks
checks = [
    RowCountCheck(min=1000, max=1_000_000),
    NullRateCheck(column="user_id", max_rate=0.01),
    SchemaCheck(expected_columns=["user_id", "amount", "ts"]),
    MetricDriftCheck(
        column="amount",
        metric="mean",
        max_deviation_pct=20,
    ),
]

Critical failures halt the pipeline and quarantine the data. Non-critical failures (like a moderate increase in null rates) send alerts but allow the pipeline to continue. The severity classification is configured per check, reflecting the business impact of each data quality issue.

Schema Drift Detection

Source schemas change without notice. A proactive approach compares the current schema against the expected schema before processing:

def detect_schema_drift(dataframe, expected_schema):
    current_columns = set(dataframe.columns)
    expected_columns = set(expected_schema.keys())

    added = current_columns - expected_columns
    removed = expected_columns - current_columns

    type_changes = []
    for col in current_columns & expected_columns:
        actual_type = dataframe.schema[col].dataType
        expected_type = expected_schema[col]
        if actual_type != expected_type:
            type_changes.append({
                "column": col,
                "expected": expected_type,
                "actual": actual_type,
            })

    if removed or type_changes:
        raise SchemaDriftError(
            added=added,
            removed=removed,
            type_changes=type_changes,
        )

    if added:
        log.warning(f"New columns detected: {added}")

New columns trigger a warning but do not halt the pipeline - they are likely additions to the source system. Removed columns or type changes are breaking changes that halt processing. This catches schema drift at ingestion, before corrupted data reaches transformations.

Pipeline Observability

Pipeline monitoring requires different metrics than service monitoring. Instead of request latency and error rates, track:

class PipelineMetrics:
    def record_run(self, pipeline_name, run):
        self.gauge("pipeline.rows_processed",
            run.row_count,
            tags={"pipeline": pipeline_name})

        self.gauge("pipeline.duration_seconds",
            run.duration.total_seconds(),
            tags={"pipeline": pipeline_name})

        self.gauge("pipeline.freshness_seconds",
            (datetime.utcnow() - run.data_timestamp)
                .total_seconds(),
            tags={"pipeline": pipeline_name})

        self.gauge("pipeline.null_rate",
            run.null_rates,
            tags={"pipeline": pipeline_name})

        if run.row_count > 0:
            prev = self.get_previous_count(pipeline_name)
            if prev and prev > 0:
                drift = abs(run.row_count - prev) / prev
                self.gauge("pipeline.row_count_drift",
                    drift,
                    tags={"pipeline": pipeline_name})

Row count drift is particularly important. If a pipeline normally processes 50,000 rows and suddenly processes 5,000, something changed in the source. If it processes 500,000, there may be a duplication issue. Either way, the drift metric catches it.

Design Tradeoffs

Batch processing over streaming trades data freshness for operational simplicity. When the business needs daily metrics, not real-time, batch is cheaper to build, debug, and maintain.

Fail-loud with quarantine over silent data loss. When data fails validation, the pipeline stops and quarantines the bad data rather than dropping it or pushing it through. This creates operational incidents (the pipeline is down) but prevents a worse outcome (wrong data in dashboards for days).

Quality gates over end-to-end tests. Testing the full pipeline end-to-end with synthetic data catches some issues but misses production-specific problems like schema drift, volume spikes, and encoding changes. Quality gates on production data catch real problems in real time.

Idempotent pipeline stages over exactly-once processing. Each pipeline stage writes output to a new partition or table suffix. If a stage needs to be rerun, it overwrites the output completely rather than appending. This makes every stage safely retryable without deduplication logic.

Lessons Learned

Silent failures are the most expensive kind of pipeline failure. A pipeline that crashes at 3 AM gets fixed at 3 AM. A pipeline that produces subtly wrong data gets fixed weeks later, after decisions were made on those numbers. Invest in quality gates early.

Row count monitoring catches more issues than any other single metric. A sudden change in volume always means something changed - a source schema change, a failed upstream job, a date filter bug. Alert on significant row count drift.

Schema drift is not a bug, it is a fact of life. Source systems evolve. Building pipelines that detect and adapt to schema changes is more sustainable than trying to prevent source systems from changing.

Data freshness is an SLA, not a best-effort metric. If the business needs numbers by 8 AM, the pipeline must reliably complete by 7:30 AM. Treat this deadline with the same seriousness as service uptime SLAs.

References