Skip to content

Pipeline Patterns Overview

Why Pipeline Patterns Exist

Data doesn't magically appear in the right place, in the right format, at the right time. Getting data from where it's generated (operational systems, APIs, events) to where it's consumed (warehouses, ML models, dashboards) requires a systematic approach — a data pipeline.

Data pipelines are the plumbing of the modern data stack. They're invisible when they work and catastrophic when they don't. Pipeline patterns are the accumulated wisdom of decades of building, operating, and debugging these systems.

Historical Context

  • 1970s-1990s: ETL (Extract, Transform, Load) — data is transformed before loading into the warehouse
  • 2000s: ELT (Extract, Load, Transform) — data is loaded raw and transformed in the warehouse
  • 2010s: Lambda Architecture (batch + stream), Kappa Architecture (stream only)
  • 2015s: ELT becomes dominant with cloud warehouses (Snowflake, BigQuery)
  • 2020s: Data Mesh (decentralized), Data Contracts, dbt-driven transformations, Reverse ETL
  • 2025s: Real-time data products, event-driven architectures, AI-powered pipeline observability

First Principles

The Pipeline Abstraction

Every data pipeline, regardless of complexity, consists of:

Pipeline=SourceTransform1Transform2Sink

ETL vs. ELT

AspectETLELT
Transform locationETL server (Informatica, DataStage)In the warehouse (dbt, SQL)
ScalabilityLimited by ETL serverWarehouse compute
Raw data retentionOften lost after transformAlways available
Schema flexibilityFixed at design timeSchema-on-read possible
CostETL license + computeWarehouse compute
Dominant era2000-20152015-present

The Medallion Architecture

The most common modern pipeline layering:

typescript
interface MedallionLayer {
  name: 'bronze' | 'silver' | 'gold';
  purpose: string;
  dataQuality: string;
  consumers: string[];
  refreshPattern: string;
}

const layers: MedallionLayer[] = [
  {
    name: 'bronze',
    purpose: 'Raw ingestion — exact copy of source',
    dataQuality: 'Source fidelity only',
    consumers: ['Data engineers', 'Debug/audit'],
    refreshPattern: 'Append-only, event-driven or scheduled',
  },
  {
    name: 'silver',
    purpose: 'Cleaned, deduplicated, typed, standardized',
    dataQuality: 'Schema enforcement, null handling, dedup',
    consumers: ['Data engineers', 'Advanced analysts'],
    refreshPattern: 'Incremental merge on primary key',
  },
  {
    name: 'gold',
    purpose: 'Business-level aggregations and models',
    dataQuality: 'Business rules validated, tested',
    consumers: ['Analysts', 'Dashboards', 'ML models', 'APIs'],
    refreshPattern: 'Full rebuild or incremental depending on logic',
  },
];

Core Pipeline Patterns

Pattern 1: Change Data Capture (CDC)

Capture only the changes from source systems instead of full extracts.

See CDC Patterns for comprehensive coverage.

Pattern 2: Idempotent Pipelines

A pipeline that produces the same result regardless of how many times it runs:

f(f(x))=f(x)
typescript
interface IdempotentPipelineStep<T> {
  /**
   * Process input and produce output.
   * Running this multiple times with the same input produces the same output.
   */
  execute(input: T): Promise<T>;

  /**
   * Unique identifier for this execution.
   * Used for deduplication and retry safety.
   */
  executionId(input: T): string;
}

class IdempotentUpsert implements IdempotentPipelineStep<DataBatch> {
  constructor(private readonly db: Database) {}

  async execute(input: DataBatch): Promise<DataBatch> {
    for (const record of input.records) {
      await this.db.query(
        `INSERT INTO target_table (id, data, updated_at)
         VALUES ($1, $2, $3)
         ON CONFLICT (id) DO UPDATE
         SET data = EXCLUDED.data,
             updated_at = EXCLUDED.updated_at
         WHERE target_table.updated_at < EXCLUDED.updated_at`,
        [record.id, record.data, record.updatedAt],
      );
    }
    return input;
  }

  executionId(input: DataBatch): string {
    return `upsert-${input.batchId}`;
  }
}

interface DataBatch {
  batchId: string;
  records: Array<{ id: string; data: unknown; updatedAt: Date }>;
}

interface Database {
  query(sql: string, params: unknown[]): Promise<unknown>;
}

Pattern 3: Incremental Processing

Process only new or changed data, not the entire dataset:

typescript
interface IncrementalProcessor {
  /**
   * Get the high-water mark from the last successful run.
   */
  getHighWaterMark(): Promise<Date | number | string>;

  /**
   * Process records after the high-water mark.
   */
  processIncremental(
    afterMark: Date | number | string,
  ): Promise<{ recordsProcessed: number; newHighWaterMark: Date | number | string }>;

  /**
   * Update the high-water mark after successful processing.
   */
  updateHighWaterMark(mark: Date | number | string): Promise<void>;
}

class TimestampBasedIncremental implements IncrementalProcessor {
  constructor(
    private readonly sourceDb: Database,
    private readonly targetDb: Database,
    private readonly stateDb: Database,
    private readonly pipelineId: string,
  ) {}

  async getHighWaterMark(): Promise<Date> {
    const result = await this.stateDb.query(
      'SELECT high_water_mark FROM pipeline_state WHERE pipeline_id = $1',
      [this.pipelineId],
    );
    return (result as any).rows[0]?.high_water_mark ?? new Date('1970-01-01');
  }

  async processIncremental(
    afterMark: Date | number | string,
  ): Promise<{ recordsProcessed: number; newHighWaterMark: Date }> {
    // Extract new records
    const records = await this.sourceDb.query(
      'SELECT * FROM source_table WHERE updated_at > $1 ORDER BY updated_at',
      [afterMark],
    );

    if ((records as any).rows.length === 0) {
      return { recordsProcessed: 0, newHighWaterMark: afterMark as Date };
    }

    // Load into target
    for (const record of (records as any).rows) {
      await this.targetDb.query(
        `INSERT INTO target_table (id, data, updated_at)
         VALUES ($1, $2, $3)
         ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data, updated_at = EXCLUDED.updated_at`,
        [record.id, record.data, record.updated_at],
      );
    }

    const lastRow = (records as any).rows[(records as any).rows.length - 1];
    return {
      recordsProcessed: (records as any).rows.length,
      newHighWaterMark: lastRow.updated_at,
    };
  }

  async updateHighWaterMark(mark: Date | number | string): Promise<void> {
    await this.stateDb.query(
      `INSERT INTO pipeline_state (pipeline_id, high_water_mark, updated_at)
       VALUES ($1, $2, NOW())
       ON CONFLICT (pipeline_id)
       DO UPDATE SET high_water_mark = EXCLUDED.high_water_mark, updated_at = NOW()`,
      [this.pipelineId, mark],
    );
  }
}

Pattern 4: Full Refresh vs. Incremental

AspectFull RefreshIncremental
CorrectnessGuaranteed (rebuild from scratch)Depends on change tracking
PerformanceSlow for large datasetsFast (only delta)
ComplexitySimpleComplex (state, watermarks)
RecoveryTrivial (just re-run)Must handle edge cases
Data volumeEntire dataset every runOnly changes
Source impactHigh (full scan)Low (indexed reads)

When to use full refresh:

  • Small tables (< 10M rows)
  • Source doesn't support change tracking
  • Complex transformations where incremental is error-prone
  • Periodic reconciliation alongside incremental

Pattern 5: Dead Letter Queue

Route failed records to a separate queue for investigation instead of failing the entire pipeline:

typescript
interface DeadLetterQueue<T> {
  send(
    failedRecord: T,
    error: Error,
    metadata: {
      pipelineId: string;
      step: string;
      attemptNumber: number;
      timestamp: Date;
    },
  ): Promise<void>;

  getFailedRecords(
    pipelineId: string,
    limit: number,
  ): Promise<Array<{
    record: T;
    error: string;
    metadata: Record<string, unknown>;
  }>>;

  retry(recordId: string): Promise<void>;
}

class PipelineWithDLQ<T> {
  constructor(
    private readonly processor: (record: T) => Promise<void>,
    private readonly dlq: DeadLetterQueue<T>,
    private readonly pipelineId: string,
    private readonly maxRetries: number = 3,
  ) {}

  async processRecord(record: T): Promise<void> {
    for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
      try {
        await this.processor(record);
        return; // Success
      } catch (error) {
        if (attempt === this.maxRetries) {
          // Max retries exceeded — send to DLQ
          await this.dlq.send(record, error as Error, {
            pipelineId: this.pipelineId,
            step: 'process',
            attemptNumber: attempt,
            timestamp: new Date(),
          });
          return; // Don't fail the pipeline
        }
        // Exponential backoff
        await new Promise((r) =>
          setTimeout(r, Math.pow(2, attempt) * 1000),
        );
      }
    }
  }
}

Pipeline Architecture Patterns

Lambda Architecture

Problem: Two codebases (batch + stream) computing the same logic.

Kappa Architecture

Advantage: Single codebase. Reprocessing = replay from the event log.

Modern Data Stack

Performance Characteristics

Pipeline Throughput Factors

Throughput=min(Source rate,Transform rate,Sink rate)

The pipeline is as fast as its slowest component.

Batch Size Optimization

Optimal batch size=2×fixed_overhead×target_throughputper_record_cost

Too small: overhead dominates. Too large: latency suffers.

Batch SizeThroughputLatencyMemory
1 (record-by-record)LowLowestLowest
100-1000MediumMediumMedium
10,000-100,000HighHigherHigher
1,000,000+HighestHighestRisk of OOM

Pipeline Latency Breakdown

Latencye2e=Textract+Ttransfer+Ttransform+Tload+Tscheduling

For scheduled batch pipelines, scheduling overhead often dominates:

ComponentTypical RangeOptimization
Scheduling0-60 min (cron)Event-driven triggers
Extract1-30 minIncremental, parallel
Transfer1-10 minCompression, streaming
Transform1-60 minIncremental, caching
Load1-10 minBulk load, staging

Edge Cases & Failure Modes

Pipeline Failure Categories

Partial Failure Handling

When a batch pipeline fails halfway through:

typescript
interface CheckpointedPipeline {
  /**
   * Save progress so we can resume from where we left off.
   */
  saveCheckpoint(state: PipelineCheckpoint): Promise<void>;
  loadCheckpoint(): Promise<PipelineCheckpoint | null>;

  /**
   * Process a batch with checkpoint-based resumability.
   */
  processWithResume(
    batches: AsyncIterable<DataBatch>,
  ): Promise<PipelineResult>;
}

interface PipelineCheckpoint {
  pipelineRunId: string;
  lastProcessedBatchId: string;
  lastProcessedOffset: number;
  timestamp: Date;
  metadata: Record<string, unknown>;
}

interface PipelineResult {
  totalBatches: number;
  successfulBatches: number;
  failedBatches: number;
  skippedBatches: number; // Already processed (from checkpoint)
}

Mathematical Foundations

Pipeline Reliability

For a pipeline with n independent stages, each with reliability ri:

Rpipeline=i=1nri

A 5-stage pipeline with each stage at 99% reliability:

R=0.995=0.95195.1%

To achieve 99.9% pipeline reliability with 5 stages, each stage needs:

r=0.9991/5=0.9998=99.98%

Throughput with Parallelism

For embarrassingly parallel pipelines:

Throughput(P)=Throughput(1)×P×η(P)

Where P is parallelism and η(P) is the efficiency factor (accounts for coordination overhead):

η(P)11+α×(P1)

Where α is the serialization fraction (Amdahl's Law applied to pipelines).

Real-World War Stories

War Story

The Midnight Pipeline Pileup

A company scheduled 200 dbt models to run at midnight. Each model ran as an independent Airflow task, all starting simultaneously. The Snowflake warehouse hit its concurrency limit, queries queued, timeouts cascaded, and 80% of models failed.

Fix:

  1. Implemented a dependency graph in dbt (models depend on upstream models)
  2. Used Airflow's pool feature to limit concurrent Snowflake queries to 20
  3. Staggered non-dependent models across a 2-hour window
  4. Pipeline completion time actually decreased from 4 hours to 2 hours (less contention)

War Story

The Incremental Pipeline That Lost Data

A team built an incremental pipeline using updated_at > last_run_timestamp. One day, the source system performed a bulk update that set updated_at to the same timestamp for 500K records. Their high-water mark logic used > (strictly greater than), not >=, causing the 500K records to be skipped.

The data gap was discovered 3 weeks later when a business user noticed missing records in a report.

Fix:

  1. Changed to >= comparison
  2. Made the pipeline idempotent (UPSERT) so re-processing is safe
  3. Added a daily reconciliation that compares source and target row counts
  4. Added data quality alerts for unusual changes in row counts

Decision Framework

Pipeline Architecture Selection

FactorBatchMicro-BatchStreaming
Latency requirementHoursMinutesSeconds
Data volumeAnyAnyHigh continuous
ComplexityLowMediumHigh
CostLowestMediumHighest
Operational overheadLowMediumHigh
Debugging easeEasiestMediumHardest

Advanced Topics

Data Mesh Pipeline Patterns

In a data mesh, pipelines are owned by domain teams:

typescript
interface DataProduct {
  name: string;
  domain: string;
  owner: string;
  inputPorts: DataPort[];
  outputPorts: DataPort[];
  sla: {
    freshness: string;
    availability: number;
    quality: number;
  };
  pipeline: PipelineDefinition;
}

interface DataPort {
  name: string;
  type: 'kafka-topic' | 'table' | 'api' | 'file';
  schema: unknown;
  accessControl: string[];
}

interface PipelineDefinition {
  type: 'batch' | 'streaming' | 'hybrid';
  schedule?: string;
  steps: PipelineStep[];
}

interface PipelineStep {
  name: string;
  type: 'extract' | 'transform' | 'load' | 'quality-check';
  config: Record<string, unknown>;
}

Reverse ETL

Moving data from the warehouse back to operational systems:

This closes the loop: operational data flows to the warehouse for analysis, and enriched data flows back to operational systems.

Pipeline as Code

Modern pipelines are defined as code, version-controlled, and tested:

typescript
// Pipeline definition as TypeScript (Dagster-style)
interface PipelineAsCode {
  name: string;
  version: string;
  assets: AssetDefinition[];
  schedules: ScheduleDefinition[];
  sensors: SensorDefinition[];
}

interface AssetDefinition {
  name: string;
  dependencies: string[];
  compute: (context: ComputeContext) => Promise<void>;
  freshness: {
    maximumLagMinutes: number;
    cronSchedule: string;
  };
  metadata: Record<string, unknown>;
}

interface ScheduleDefinition {
  name: string;
  cron: string;
  targetAssets: string[];
}

interface SensorDefinition {
  name: string;
  trigger: 'file-arrival' | 'kafka-message' | 'api-poll';
  config: Record<string, unknown>;
  targetAssets: string[];
}

interface ComputeContext {
  log: (message: string) => void;
  getConfig(key: string): unknown;
  getUpstreamData(assetName: string): Promise<unknown>;
}

Cross-References

"What I cannot create, I do not understand." — Richard Feynman