Skip to content
Unverified — AI-generated content. Help verify this page

Exactly-Once Stream Processing

Why Exactly-Once Exists

In distributed systems, three delivery guarantees are commonly discussed:

  1. At-most-once: Fire and forget. Events may be lost.
  2. At-least-once: Retry on failure. Events may be duplicated.
  3. Exactly-once: Each event is processed precisely once — no loss, no duplication.

The first two are straightforward to implement. Exactly-once is notoriously difficult because distributed systems face network partitions, node failures, and partial failures. The famous "Two Generals' Problem" proves that perfect agreement is impossible over unreliable channels.

Yet streaming applications — financial transactions, billing events, inventory management — require exactly-once semantics. The question is: how do we achieve it in practice?

Historical Context

Early stream processors (Storm, S4) provided at-most-once or at-least-once guarantees. Storm's Trident layer added exactly-once through micro-batching, but with significant latency overhead. Apache Flink (2015) introduced distributed snapshots based on the Chandy-Lamport algorithm, enabling exactly-once without micro-batching. Kafka Streams achieved exactly-once through Kafka's idempotent producer and transactional APIs (KIP-98, 2017). Today, exactly-once is a solved problem for most practical use cases, but understanding the mechanics is essential for correct system design.

The Key Insight

True exactly-once is impossible in the general case (you cannot prevent a side effect from happening more than once if the side effect is external). What streaming systems actually provide is effectively-once — the combination of:

  1. Exactly-once state updates (via checkpointing)
  2. Exactly-once output (via transactional sinks or idempotent writes)

First Principles

The Checkpoint Model

A checkpoint is a consistent snapshot of the entire pipeline state at a logical point in time:

Checkpoint Cn={S1n,S2n,,Skn,O1n,O2n,,Omn}

Where:

  • Sin = state of operator i at checkpoint n
  • Ojn = offset/position of source j at checkpoint n

On failure, the system restores from the latest successful checkpoint and replays events from the source offsets:

Recovery: i:SiSin,j:seek(j,Ojn)

The Chandy-Lamport Algorithm

The distributed snapshot algorithm that underlies Flink's checkpointing:

Key properties:

  1. Barriers are injected into the data stream — no pause needed
  2. Each operator snapshots state when it receives the barrier — asynchronous
  3. Barriers align at operators with multiple inputs — ensures consistency

Barrier Alignment

For operators with multiple inputs, barrier alignment ensures that the state snapshot is consistent:

typescript
enum AlignmentState {
  WAITING_FOR_BARRIERS = 'WAITING_FOR_BARRIERS',
  ALL_BARRIERS_RECEIVED = 'ALL_BARRIERS_RECEIVED',
}

interface CheckpointBarrier {
  checkpointId: number;
  timestamp: number;
}

class BarrierAligner {
  private receivedBarriers: Set<string> = new Set();
  private blockedChannels: Set<string> = new Set();
  private bufferedElements: Map<string, unknown[]> = new Map();

  constructor(private readonly inputChannels: string[]) {}

  /**
   * Process an incoming barrier from one input channel.
   * Returns true when all barriers have been received.
   */
  processBarrier(
    channelId: string,
    barrier: CheckpointBarrier,
  ): { aligned: boolean; bufferedData: Map<string, unknown[]> } {
    this.receivedBarriers.add(channelId);
    this.blockedChannels.add(channelId);

    if (this.receivedBarriers.size === this.inputChannels.length) {
      // All barriers received — snapshot state now
      const buffered = new Map(this.bufferedElements);
      this.reset();
      return { aligned: true, bufferedData: buffered };
    }

    return { aligned: false, bufferedData: new Map() };
  }

  /**
   * Buffer data from channels that have already sent their barrier.
   * Data from non-blocked channels is processed normally.
   */
  processElement(channelId: string, element: unknown): boolean {
    if (this.blockedChannels.has(channelId)) {
      // Buffer this element — it belongs to checkpoint n+1
      const buffer = this.bufferedElements.get(channelId) ?? [];
      buffer.push(element);
      this.bufferedElements.set(channelId, buffer);
      return false; // Element not processed yet
    }
    return true; // Process normally — belongs to checkpoint n
  }

  private reset(): void {
    this.receivedBarriers.clear();
    this.blockedChannels.clear();
    this.bufferedElements.clear();
  }
}

WARNING

Barrier alignment causes backpressure on channels that have received their barrier. This is the cost of exactly-once with aligned barriers. Flink 1.11+ introduced unaligned checkpoints that avoid this at the cost of larger checkpoint sizes.

Unaligned Checkpoints

Aligned checkpoints block fast channels while waiting for slow ones. Unaligned checkpoints avoid this by including in-flight data in the checkpoint:

Tradeoffs:

AspectAlignedUnaligned
Checkpoint latencyHigh under skewLow, predictable
Checkpoint sizeSmallerLarger (includes buffers)
Processing latencySpikes during alignmentSmooth
Recovery timeFaster (less state)Slower (more state to restore)
typescript
interface UnalignedCheckpointState {
  operatorState: Uint8Array;
  inFlightBuffers: Map<string, Uint8Array[]>; // channel -> buffered elements
  sourceOffsets: Map<string, number>;
}

class UnalignedCheckpointCoordinator {
  /**
   * On receiving a barrier from ANY input, immediately:
   * 1. Snapshot operator state
   * 2. Capture all buffered elements in input/output channels
   * 3. Forward barrier to all outputs
   */
  onBarrier(
    channelId: string,
    barrier: CheckpointBarrier,
    operatorState: Uint8Array,
    inputBuffers: Map<string, Uint8Array[]>,
    outputBuffers: Map<string, Uint8Array[]>,
  ): UnalignedCheckpointState {
    return {
      operatorState,
      inFlightBuffers: new Map([...inputBuffers, ...outputBuffers]),
      sourceOffsets: new Map(), // populated by source operators
    };
  }

  /**
   * On recovery, restore in-flight data to the correct channels
   * before resuming processing.
   */
  restore(state: UnalignedCheckpointState): void {
    // 1. Restore operator state
    // 2. Re-inject buffered elements into their channels
    // 3. Seek sources to saved offsets
    for (const [channel, buffers] of state.inFlightBuffers) {
      for (const buffer of buffers) {
        this.reinjectToChannel(channel, buffer);
      }
    }
  }

  private reinjectToChannel(_channel: string, _data: Uint8Array): void {
    // Implementation: push data back into the channel's input buffer
  }
}

Two-Phase Commit for Sinks

Checkpointing handles internal state, but what about external systems? If Flink commits a checkpoint but the sink has already written data, on recovery the data would be written again (duplicates).

The two-phase commit protocol ensures sinks participate in the checkpoint:

Implementation: Kafka Exactly-Once Sink

typescript
interface KafkaTransaction {
  id: string;
  state: 'active' | 'pre-committed' | 'committed' | 'aborted';
  records: Array<{ topic: string; key: string; value: string }>;
}

class ExactlyOnceKafkaSink {
  private currentTransaction: KafkaTransaction | null = null;
  private pendingTransactions: KafkaTransaction[] = [];
  private committedCheckpoints: Set<number> = new Set();

  constructor(
    private readonly transactionalId: string,
    private readonly producer: KafkaTransactionalProducer,
  ) {}

  /**
   * Called for each output record.
   * Writes within the current transaction.
   */
  async write(record: { topic: string; key: string; value: string }): Promise<void> {
    if (!this.currentTransaction) {
      this.currentTransaction = await this.beginTransaction();
    }
    this.currentTransaction.records.push(record);
    await this.producer.send(record, this.currentTransaction.id);
  }

  /**
   * Phase 1: Pre-commit on checkpoint barrier.
   * Flush all buffered records and mark transaction as pre-committed.
   */
  async preCommit(checkpointId: number): Promise<void> {
    if (!this.currentTransaction) return;

    await this.producer.flush(this.currentTransaction.id);
    this.currentTransaction.state = 'pre-committed';
    this.pendingTransactions.push(this.currentTransaction);
    this.currentTransaction = null;

    // Start a new transaction for the next checkpoint epoch
    this.currentTransaction = await this.beginTransaction();
  }

  /**
   * Phase 2: Commit when checkpoint is confirmed complete.
   */
  async commit(checkpointId: number): Promise<void> {
    this.committedCheckpoints.add(checkpointId);

    for (const tx of this.pendingTransactions) {
      if (tx.state === 'pre-committed') {
        await this.producer.commitTransaction(tx.id);
        tx.state = 'committed';
      }
    }

    // Clean up committed transactions
    this.pendingTransactions = this.pendingTransactions.filter(
      (tx) => tx.state !== 'committed',
    );
  }

  /**
   * Abort on checkpoint failure.
   */
  async abort(checkpointId: number): Promise<void> {
    for (const tx of this.pendingTransactions) {
      if (tx.state === 'pre-committed') {
        await this.producer.abortTransaction(tx.id);
        tx.state = 'aborted';
      }
    }
    this.pendingTransactions = [];
  }

  private async beginTransaction(): Promise<KafkaTransaction> {
    const id = `${this.transactionalId}-${Date.now()}`;
    await this.producer.beginTransaction(id);
    return { id, state: 'active', records: [] };
  }
}

// Type stubs for the Kafka producer interface
interface KafkaTransactionalProducer {
  beginTransaction(id: string): Promise<void>;
  send(record: { topic: string; key: string; value: string }, txId: string): Promise<void>;
  flush(txId: string): Promise<void>;
  commitTransaction(id: string): Promise<void>;
  abortTransaction(id: string): Promise<void>;
}

Idempotent Sinks

An alternative to two-phase commit is making writes idempotent: writing the same data twice has the same effect as writing it once.

f(f(x))=f(x)(idempotency)

Idempotent Database Writes

typescript
interface IdempotentWrite {
  // Unique identifier derived deterministically from the input
  idempotencyKey: string;
  // The data to write
  payload: Record<string, unknown>;
}

class IdempotentDatabaseSink {
  constructor(private readonly db: Database) {}

  /**
   * Upsert pattern: INSERT ... ON CONFLICT DO UPDATE
   * The idempotency key ensures re-processing produces the same result.
   */
  async write(record: IdempotentWrite): Promise<void> {
    await this.db.query(
      `INSERT INTO output_table (idempotency_key, data, updated_at)
       VALUES ($1, $2, NOW())
       ON CONFLICT (idempotency_key)
       DO UPDATE SET data = EXCLUDED.data, updated_at = NOW()`,
      [record.idempotencyKey, JSON.stringify(record.payload)],
    );
  }

  /**
   * Generate deterministic idempotency key from event data.
   * CRITICAL: Must be the same across re-processing.
   */
  static generateKey(
    eventId: string,
    windowStart: number,
    windowEnd: number,
  ): string {
    return `${eventId}:${windowStart}:${windowEnd}`;
  }
}

interface Database {
  query(sql: string, params: unknown[]): Promise<void>;
}

Idempotent vs. Transactional Sinks

AspectTransactional (2PC)Idempotent
ComplexityHighMedium
LatencyHigher (transaction overhead)Lower
Supported sinksMust support transactionsMust support upserts
Exactly-once scopeFull pipelinePer-sink
Recovery overheadRollback uncommittedRe-write (no-op)

Checkpoint Storage & Performance

Checkpoint Size Estimation

Checkpoint size=i=1k|Si|+j=1m|offsetj|+metadata

Where:

  • |Si| = serialized size of operator i's state
  • Key-value state: |Si|=keys(|key|+|value|)
  • Window state: |Si|=|windows|×avg_window_state

Checkpoint Duration

Tcheckpoint=Tbarrier_propagation+Tsnapshot+TuploadTbarrier_propagation=pipeline_depththroughput+alignment_delayTupload=checkpoint_sizeupload_bandwidth

Incremental Checkpoints

Full checkpoints serialize all state every time. Incremental checkpoints only serialize changes since the last checkpoint:

Incremental size=ΔS=SnSn1

RocksDB's SST file approach:

typescript
interface IncrementalCheckpoint {
  checkpointId: number;
  // New SST files created since last checkpoint
  newSstFiles: string[];
  // SST files from previous checkpoints still needed
  referencedSstFiles: Map<number, string[]>; // checkpointId -> files
  // Files that can be garbage collected
  obsoleteSstFiles: string[];
}

class IncrementalCheckpointManager {
  private checkpointHistory: IncrementalCheckpoint[] = [];

  async createIncrementalCheckpoint(
    currentCheckpointId: number,
    allSstFiles: string[],
    previousSstFiles: Set<string>,
  ): Promise<IncrementalCheckpoint> {
    const newFiles = allSstFiles.filter((f) => !previousSstFiles.has(f));
    const referencedFiles = allSstFiles.filter((f) => previousSstFiles.has(f));

    const checkpoint: IncrementalCheckpoint = {
      checkpointId: currentCheckpointId,
      newSstFiles: newFiles,
      referencedSstFiles: this.resolveReferences(referencedFiles),
      obsoleteSstFiles: this.findObsoleteFiles(allSstFiles),
    };

    // Only upload new SST files
    for (const file of newFiles) {
      await this.uploadToCheckpointStorage(file);
    }

    this.checkpointHistory.push(checkpoint);
    return checkpoint;
  }

  private resolveReferences(
    _files: string[],
  ): Map<number, string[]> {
    // Map each referenced file to the checkpoint that created it
    return new Map();
  }

  private findObsoleteFiles(_currentFiles: string[]): string[] {
    // Files in old checkpoints not referenced by any active checkpoint
    return [];
  }

  private async uploadToCheckpointStorage(_file: string): Promise<void> {
    // Upload to S3, HDFS, etc.
  }
}

Benchmark: Checkpoint Performance

State SizeFull CheckpointIncremental CheckpointRatio
1 GB5s0.5s10x
10 GB45s2s22x
100 GB8min10s48x
1 TB80min30s160x

TIP

Always use incremental checkpoints for state sizes > 1 GB. The savings grow super-linearly because most state is unchanged between checkpoints.

Edge Cases & Failure Modes

Checkpoint Timeout

If a checkpoint takes too long, the system must decide whether to fail the job or continue without that checkpoint:

typescript
interface CheckpointConfig {
  intervalMs: number;          // How often to trigger checkpoints
  timeoutMs: number;           // Max time for a checkpoint to complete
  minPauseBetweenMs: number;   // Min time between checkpoints
  maxConcurrentCheckpoints: number;
  tolerableFailureCount: number; // How many consecutive failures before job fails
  externalizedCheckpoint: boolean; // Keep checkpoint on cancellation
}

const productionConfig: CheckpointConfig = {
  intervalMs: 60_000,           // Every minute
  timeoutMs: 600_000,          // 10 minute timeout
  minPauseBetweenMs: 30_000,   // At least 30s between
  maxConcurrentCheckpoints: 1, // No overlap
  tolerableFailureCount: 3,    // Fail after 3 consecutive failures
  externalizedCheckpoint: true,
};

The Zombie Transaction Problem

After recovery, old sink transactions may still be pending in external systems:

Timeline:
1. Checkpoint n committed → Kafka transaction T_n committed
2. Processing continues, Kafka transaction T_{n+1} started
3. FAILURE! Before checkpoint n+1 completes
4. Recovery from checkpoint n
5. T_{n+1} is now a "zombie" — pre-committed but never committed

Problem: Kafka consumers with read_committed see T_{n+1}'s data
after the transaction timeout (default: 15 min)

Solution: Fencing with epoch numbers

typescript
class FencedTransactionalProducer {
  private epoch: number = 0;

  /**
   * On recovery, increment the epoch.
   * Any transactions from previous epochs are automatically fenced (aborted).
   */
  async recover(): Promise<void> {
    this.epoch++;
    // The new producer with the same transactional.id but higher epoch
    // automatically aborts any pending transactions from previous epochs
    await this.initializeTransactionalProducer(
      `${this.transactionalId}-${this.epoch}`,
    );
  }

  private transactionalId: string = 'my-sink';

  private async initializeTransactionalProducer(_id: string): Promise<void> {
    // Kafka broker fences old producers with same transactional.id
  }
}

Split-Brain During Checkpoint

If a network partition occurs during checkpointing, some operators may acknowledge while others don't:

Operators={O1,O2,O3,O4}Partition 1={O1,O2}ack’d checkpointPartition 2={O3,O4}timed out

The checkpoint coordinator must receive ALL acknowledgments to consider the checkpoint complete. Partial acknowledgment means checkpoint failure — safe, but wastes the work done by the acknowledged operators.

Exactly-Once With Side Effects

The hardest case: your processing function has external side effects:

typescript
// DANGEROUS: This breaks exactly-once
async function processEvent(event: Event): Promise<void> {
  await sendEmail(event.userId, 'Your order is confirmed'); // Side effect!
  await updateState(event); // State update
  // If failure occurs after email but before state update,
  // on recovery the email will be sent AGAIN
}

// CORRECT: Make side effects idempotent
async function processEventCorrectly(event: Event): Promise<void> {
  const emailId = `order-confirm-${event.orderId}`; // Deterministic ID
  await sendEmailIdempotent(emailId, event.userId, 'Your order is confirmed');
  await updateState(event);
}

// Or: defer side effects to the commit phase
async function processEventDeferred(event: Event): Promise<void> {
  await updateState(event);
  await bufferSideEffect({
    type: 'email',
    key: `order-confirm-${event.orderId}`,
    payload: { userId: event.userId, message: 'Your order is confirmed' },
  });
  // Side effects executed only after checkpoint commits
}

DANGER

HTTP calls, emails, SMS, push notifications — any non-idempotent side effect will be duplicated on recovery. Either make them idempotent (with unique keys) or defer them to after checkpoint confirmation.

Performance Characteristics

Throughput Impact

Checkpointing reduces throughput due to:

  1. Barrier alignment (blocking fast channels)
  2. State serialization (CPU overhead)
  3. State upload (I/O overhead)
Throughputeffective=Throughputraw×TintervalTcheckpointTinterval

For a 60-second interval with 5-second checkpoints: 5560=91.7% throughput.

Recovery Time

Trecovery=Tstate_restore+TreplayTreplay=events_since_last_checkpointthroughputevents_since_last_checkpoint=throughput×Tinterval

Lower checkpoint interval = faster recovery but higher overhead.

Mathematical Foundations

Chandy-Lamport Correctness Proof

The algorithm produces a consistent cut — a global state that could have occurred during execution.

Definition: A cut C is consistent if for every event e in C, all events that causally precede e are also in C:

eC:causes(e)C

Proof sketch:

  1. Barriers travel through the same channels as data
  2. An operator snapshots state only after receiving barriers from ALL inputs
  3. Therefore, the snapshot includes exactly the events that arrived before the barrier
  4. No event after the barrier is included in the snapshot
  5. This forms a consistent cut because the barrier ordering respects causal ordering

Exactly-Once as Idempotent Replay

The system provides exactly-once through:

replay(Cn,En+1)=Cn+1

where Cn is the checkpoint state and En+1 are the events between checkpoints. Because:

  1. Cn is deterministic (consistent snapshot)
  2. Replay is deterministic (same events, same order)
  3. Cn+1 is the same regardless of how many times replay occurs
replay(replay(Cn,En+1),)=replay(Cn,En+1)

Real-World War Stories

War Story

The Double-Charge Incident

A payment processing company migrated from at-least-once to exactly-once processing. During the migration, they kept the old at-least-once idempotency layer "just in case." After the migration, they removed the idempotency layer since "Flink provides exactly-once."

Three months later, a Flink cluster upgrade caused a 20-minute outage. During recovery, the checkpoint was corrupted and the job restarted from an older checkpoint. Without the idempotency layer, 47,000 transactions were processed twice, resulting in double charges totaling $2.3M.

Lesson: Exactly-once in the streaming layer does NOT eliminate the need for idempotency at the sink. Defense in depth. Always.

War Story

The 100GB Checkpoint That Wouldn't Complete

A team running a session windowing job with millions of active sessions found that checkpoints started failing after 3 months in production. The session state had grown to 100 GB, and checkpoints couldn't complete within the 10-minute timeout.

Timeline:

  • Month 1: Checkpoint = 2 GB, duration = 30s
  • Month 2: Checkpoint = 20 GB, duration = 3min
  • Month 3: Checkpoint = 100 GB, duration = timeout

Root cause: Session windows for bot users never closed (bots never went inactive). A single bot session accumulated 3 months of events.

Fix:

  1. Switched to incremental checkpoints (100 GB → 500 MB incremental)
  2. Added maximum session duration TTL (24 hours)
  3. Added state size monitoring with alerts

War Story

The Exactly-Once That Wasn't

A team believed they had end-to-end exactly-once because Flink checkpointing was enabled. But their sink wrote to Elasticsearch using the bulk API without idempotency keys. On every checkpoint recovery, the replay period wrote duplicate documents to Elasticsearch.

Over 6 months, 12% of their analytics data was duplicated. They only discovered it when a user reported that their dashboard showed impossible numbers.

Root cause: Exactly-once checkpointing only guarantees consistent internal state. The Elasticsearch sink was not part of the checkpoint (no 2PC support).

Fix: Added document IDs derived from event ID + window boundaries, making Elasticsearch writes idempotent via PUT /{index}/_doc/{id}.

Decision Framework

Choosing the Right Guarantee

ScenarioRequired GuaranteeImplementation
Log aggregation, metricsAt-least-onceSimple, no checkpointing overhead
User analytics, dashboardsAt-least-once + dedupIdempotent sinks
Financial transactionsExactly-onceCheckpointing + transactional sinks
Regulatory complianceExactly-once + auditFull 2PC + audit log
Ad impressions, clicksAt-least-once + dedupBloom filter deduplication

Checkpoint Interval Selection

Advanced Topics

Exactly-Once Across Multiple Sinks

When a pipeline writes to multiple sinks, each sink must participate in the checkpoint:

typescript
class MultiSinkCheckpointCoordinator {
  private sinks: Map<string, TransactionalSink> = new Map();

  registerSink(name: string, sink: TransactionalSink): void {
    this.sinks.set(name, sink);
  }

  async preCommitAll(checkpointId: number): Promise<boolean> {
    const results = await Promise.allSettled(
      Array.from(this.sinks.entries()).map(([name, sink]) =>
        sink.preCommit(checkpointId).then(() => ({ name, success: true })),
      ),
    );

    const failures = results.filter((r) => r.status === 'rejected');
    if (failures.length > 0) {
      // Any pre-commit failure → abort all
      await this.abortAll(checkpointId);
      return false;
    }
    return true;
  }

  async commitAll(checkpointId: number): Promise<void> {
    // Commit all sinks — if any fails, it will be retried on recovery
    await Promise.all(
      Array.from(this.sinks.values()).map((sink) =>
        sink.commit(checkpointId),
      ),
    );
  }

  async abortAll(checkpointId: number): Promise<void> {
    await Promise.allSettled(
      Array.from(this.sinks.values()).map((sink) =>
        sink.abort(checkpointId),
      ),
    );
  }
}

interface TransactionalSink {
  preCommit(checkpointId: number): Promise<void>;
  commit(checkpointId: number): Promise<void>;
  abort(checkpointId: number): Promise<void>;
}

Savepoints vs. Checkpoints

AspectCheckpointSavepoint
Triggered bySystem (periodic)User (manual)
PurposeFault toleranceOperational (upgrades, migration)
FormatOptimized (incremental)Portable (full)
RetainedLast N checkpointsUntil explicitly deleted
State mappingAutomaticCan remap operator IDs

Research: Lightweight Asynchronous Snapshots

Recent research (Carbone et al.) explores snapshot algorithms that avoid barrier alignment entirely by using output determinism — tracking which outputs each operator has produced, rather than synchronizing with barriers. This promises lower checkpoint latency for high-throughput pipelines but requires deterministic operators.

Snapshotasync(Oi)=(Si,output_logi[Cn1:Cn])

On recovery, operators replay their output logs to reconstruct the downstream state, avoiding the need for synchronized barriers.

Cross-References


Key Takeaway

  • True exactly-once is "effectively-once": exactly-once state updates (via checkpointing based on Chandy-Lamport snapshots) combined with exactly-once output (via transactional sinks or idempotent writes).
  • Exactly-once in the streaming layer does NOT eliminate the need for sink-level idempotency -- defense in depth is essential.
  • Use incremental checkpoints for state sizes over 1 GB; the savings grow super-linearly as state grows.
Exercise

Design End-to-End Exactly-Once for a Billing Pipeline

You are building a streaming billing pipeline that:

  • Reads usage events from Kafka (3 partitions, 100K events/sec)
  • Aggregates per-customer usage in 1-hour windows
  • Writes hourly invoices to PostgreSQL
  • Sends invoice notification emails

Design the exactly-once strategy covering:

  1. Checkpoint configuration
  2. Sink strategy (transactional vs idempotent)
  3. How to handle the email side effect
  4. Recovery behavior after a failure
Solution
  1. Checkpoint config: Interval = 30 seconds (low state size per window), timeout = 5 minutes, max concurrent = 1, tolerable failures = 3. Use incremental checkpoints with RocksDB state backend.

  2. Sink strategy: Use idempotent writes to PostgreSQL, NOT two-phase commit. Generate a deterministic invoice ID: SHA-256(customer_id + window_start + window_end). Use INSERT ... ON CONFLICT (invoice_id) DO UPDATE SET amount = EXCLUDED.amount. This handles re-runs gracefully.

  3. Email side effect: Buffer email notifications in the Flink state. Execute them ONLY after the checkpoint containing that window's results is confirmed complete (commit phase). Use a deterministic email dedup key: invoice-{invoice_id} so the email provider can deduplicate if the notification is sent twice.

  4. Recovery: On failure, Flink restores from the last successful checkpoint. Kafka offsets are reset to checkpoint positions. The 1-hour window state is restored. Events between the checkpoint and failure are replayed. The idempotent PostgreSQL sink handles duplicate writes (same invoice_id = no-op update). Emails buffered but not yet sent are re-buffered and sent after the next checkpoint.

Common Misconceptions

  • "Exactly-once means each event is processed exactly one time." In practice, events ARE reprocessed during recovery. The system ensures the EFFECT is as if each event was processed once (idempotent replay).
  • "Enabling Flink checkpointing gives you end-to-end exactly-once." Checkpointing only guarantees consistent internal state. Without transactional or idempotent sinks, external systems will see duplicates on recovery.
  • "Unaligned checkpoints are always better than aligned." Unaligned checkpoints avoid blocking but produce larger checkpoint sizes and slower recovery times. Use aligned checkpoints when checkpoint latency is acceptable.
  • "Exactly-once eliminates the need for idempotency at the sink." The $2.3M double-charge war story proves otherwise. Always implement sink-level idempotency as defense in depth.
  • "Shorter checkpoint intervals are always better." Shorter intervals mean faster recovery but higher overhead. Each checkpoint consumes CPU (serialization), I/O (upload), and network (barrier propagation).

In Production

  • Uber uses Flink with exactly-once checkpointing for their fare calculation pipeline, with idempotent writes to their ledger database using deterministic transaction IDs derived from trip and fare window identifiers.
  • Netflix runs exactly-once streaming pipelines for billing event processing, using incremental checkpoints with RocksDB to handle multi-terabyte state across their subscriber base.
  • LinkedIn implements two-phase commit sinks for their Kafka-to-Kafka exactly-once pipelines, using Kafka's transactional producer API with epoch-based fencing to handle zombie transactions.
  • Airbnb uses idempotent Elasticsearch sinks (document ID = event ID + window boundaries) instead of two-phase commit, trading slightly higher write volume for simpler infrastructure.
Quiz

1. What does the Chandy-Lamport algorithm achieve in stream processing?

A) It balances load across worker nodes B) It creates a consistent distributed snapshot by injecting barriers into the data stream C) It detects network partitions D) It optimizes query execution plans

Answer

B) The Chandy-Lamport algorithm injects barrier markers into the data stream. Each operator snapshots its state when it receives barriers from ALL inputs, creating a globally consistent snapshot without pausing processing.

2. What is barrier alignment and why does it cause backpressure?

A) Aligning data records for sorting purposes B) When an operator with multiple inputs receives a barrier from one input, it blocks that input until barriers arrive from all other inputs, causing upstream backpressure C) Aligning checkpoints across different Flink clusters D) Synchronizing clocks between operators

Answer

B) An operator must wait for barriers from ALL inputs before snapshotting. While waiting, it blocks the channel that already sent its barrier (to avoid including post-barrier data in the snapshot). This blocking propagates upstream as backpressure.

3. What is the zombie transaction problem?

A) Transactions that run too slowly B) After recovery, pre-committed transactions from the failed epoch may still be pending in external systems C) Transactions that consume too much memory D) Concurrent write conflicts

Answer

B) Before a failure, a sink may have pre-committed a transaction that was never committed or aborted. After recovery, this "zombie" transaction lingers in the external system. Solution: epoch-based fencing, where a new producer with a higher epoch automatically aborts pending transactions from lower epochs.

4. Why are incremental checkpoints critical for large state sizes?

A) They use less CPU B) They only upload state changes since the last checkpoint, reducing upload size from full state to delta C) They compress data better D) They avoid barrier alignment

Answer

B) A full checkpoint of 100 GB state must upload all 100 GB every time. An incremental checkpoint only uploads the new/modified SST files since the last checkpoint (typically a few hundred MB). The savings grow super-linearly: 100 GB full takes 8 minutes vs 10 seconds incremental.

5. How do idempotent sinks achieve exactly-once output?

A) By using database transactions B) By generating deterministic keys from the input data so that re-writing the same data produces the same result (upsert semantics) C) By buffering all output until the pipeline completes D) By connecting directly to Flink's checkpoint coordinator

Answer

B) Idempotent sinks use deterministic keys (derived from event ID + window boundaries) with upsert semantics (INSERT ON CONFLICT DO UPDATE). Writing the same data twice with the same key is a no-op update, achieving effectively-once output.

:::


One-Liner Summary: Exactly-once is really "effectively-once" -- consistent checkpoints plus idempotent or transactional sinks ensure that failures and replays never produce duplicates or data loss.

"What I cannot create, I do not understand." — Richard Feynman