Watermarks in Stream Processing
Why Watermarks Exist
In a perfect world, events arrive in order and instantly. Reality is different — events arrive late, out of order, and with unpredictable delays. Consider a mobile analytics pipeline:
- User performs action at 10:00:00 (event time)
- Phone buffers event due to poor connectivity
- Phone reconnects at 10:05:00
- Event arrives at server at 10:05:02 (processing time)
The system must decide: when is it safe to compute results for the 10:00-10:05 window? If it waits too long, latency suffers. If it computes too early, results are incomplete.
Watermarks are the mechanism that answers this question. A watermark is an assertion: "No more events with timestamp less than W will arrive."
Historical Context
Google's MillWheel (2013) introduced the concept of low watermarks for tracking event-time completeness. The Dataflow Model paper (Akidau et al., 2015) formalized watermarks as a first-class primitive. Apache Flink was the first open-source system to implement full watermark semantics. Kafka Streams, Spark Structured Streaming, and others followed with varying levels of support.
First Principles
The Two Clocks Problem
Every streaming system operates with two clocks:
The relationship between them:
In aggregate, plotting event time against processing time shows the event-time skew distribution:
Processing Time
| . . .
| . . ideal (slope = 1)
| . . /
| . . /
| . . / actual (events arrive late)
| .. /
|. /
+------------ Event TimeThe gap between the ideal line and actual data points is the watermark lag.
Formal Definition
A watermark
Such that:
Monotonicity guarantee:
Perfect vs. Heuristic Watermarks
Perfect watermark: Guarantees that no late data will ever arrive. Possible only when the input source provides completeness information (e.g., a bounded file, or a system that tracks all outstanding events).
Heuristic watermark: Estimates completeness based on observed data patterns. May allow late data.
WARNING
Most production systems use heuristic watermarks. You MUST handle late data even with watermarks in place. Watermarks are a best-effort signal, not a guarantee.
Core Mechanics
Watermark Generation
Strategy 1: Bounded Out-of-Orderness
The most common strategy. Assumes events can arrive at most
class BoundedOutOfOrdernessWatermarkGenerator {
private maxTimestampSeen: number = -Infinity;
private lastEmittedWatermark: number = -Infinity;
constructor(
private readonly maxOutOfOrdernessMs: number,
) {}
/**
* Called for every event. Updates internal state.
*/
onEvent(eventTimestamp: number): void {
if (eventTimestamp > this.maxTimestampSeen) {
this.maxTimestampSeen = eventTimestamp;
}
}
/**
* Called periodically (e.g., every 200ms) to emit watermarks.
* Watermarks should not be emitted per-event for performance.
*/
generateWatermark(): number {
const potentialWatermark =
this.maxTimestampSeen - this.maxOutOfOrdernessMs;
// Ensure monotonicity
if (potentialWatermark > this.lastEmittedWatermark) {
this.lastEmittedWatermark = potentialWatermark;
}
return this.lastEmittedWatermark;
}
}
// Allow up to 5 seconds of out-of-orderness
const generator = new BoundedOutOfOrdernessWatermarkGenerator(5_000);
// Simulate events
generator.onEvent(Date.parse('2026-03-18T10:00:03Z'));
generator.onEvent(Date.parse('2026-03-18T10:00:01Z')); // out of order
generator.onEvent(Date.parse('2026-03-18T10:00:07Z'));
const wm = generator.generateWatermark();
// wm = 10:00:07 - 5s = 10:00:02
// Event at 10:00:01 is NOT late (10:00:01 < 10:00:02 but it arrived before watermark)Strategy 2: Punctuation-Based (Watermark Markers in Source)
Some sources embed explicit watermark signals in the data stream:
type StreamElement<T> =
| { type: 'data'; value: T; timestamp: number }
| { type: 'watermark'; timestamp: number };
class PunctuationWatermarkGenerator<T> {
private currentWatermark: number = -Infinity;
processElement(element: StreamElement<T>): {
watermarkAdvanced: boolean;
newWatermark: number;
} {
if (element.type === 'watermark') {
if (element.timestamp > this.currentWatermark) {
this.currentWatermark = element.timestamp;
return { watermarkAdvanced: true, newWatermark: this.currentWatermark };
}
}
return { watermarkAdvanced: false, newWatermark: this.currentWatermark };
}
getCurrentWatermark(): number {
return this.currentWatermark;
}
}Strategy 3: Processing-Time Based (Fallback)
When event timestamps are unreliable:
class ProcessingTimeWatermarkGenerator {
generateWatermark(): number {
return Date.now();
}
}DANGER
Processing-time watermarks provide no ordering guarantees. Use only when event timestamps are unavailable or meaningless.
Watermark Propagation
In a DAG of operators, watermarks flow downstream. Each operator computes its output watermark as a function of its input watermarks:
Multi-input operator watermark rule:
Single-input operator with buffering (e.g., sort):
class WatermarkPropagator {
private inputWatermarks: Map<string, number> = new Map();
private outputWatermark: number = -Infinity;
constructor(private readonly inputIds: string[]) {
for (const id of inputIds) {
this.inputWatermarks.set(id, -Infinity);
}
}
/**
* Called when an input channel advances its watermark.
* Returns the new output watermark if it advanced, null otherwise.
*/
updateInputWatermark(
inputId: string,
watermark: number,
): number | null {
const current = this.inputWatermarks.get(inputId);
if (current === undefined) {
throw new Error(`Unknown input: ${inputId}`);
}
if (watermark < current) {
throw new Error(
`Watermark regression on ${inputId}: ${watermark} < ${current}`,
);
}
this.inputWatermarks.set(inputId, watermark);
// Output watermark = min of all input watermarks
const newOutput = Math.min(
...Array.from(this.inputWatermarks.values()),
);
if (newOutput > this.outputWatermark) {
this.outputWatermark = newOutput;
return this.outputWatermark;
}
return null; // No advancement
}
getCurrentOutputWatermark(): number {
return this.outputWatermark;
}
}Idle Source Handling
A common production problem: one of N partitions stops producing events. Its watermark freezes, holding back the global watermark:
Partition 0: events flowing, WM = 10:05:00
Partition 1: events flowing, WM = 10:04:50
Partition 2: no events for 10 minutes, WM = 09:55:00 <-- STUCK
Global WM = min(10:05:00, 10:04:50, 09:55:00) = 09:55:00Solution: Idle timeout with watermark advancement
interface PartitionState {
lastEventTime: number;
lastActivityProcessingTime: number;
watermark: number;
idle: boolean;
}
class IdleAwareWatermarkTracker {
private partitions: Map<string, PartitionState> = new Map();
constructor(
private readonly idleTimeoutMs: number,
private readonly maxOutOfOrdernessMs: number,
) {}
registerPartition(partitionId: string): void {
this.partitions.set(partitionId, {
lastEventTime: -Infinity,
lastActivityProcessingTime: Date.now(),
watermark: -Infinity,
idle: false,
});
}
onEvent(partitionId: string, eventTimestamp: number): void {
const state = this.partitions.get(partitionId);
if (!state) return;
state.lastEventTime = Math.max(state.lastEventTime, eventTimestamp);
state.lastActivityProcessingTime = Date.now();
state.watermark = state.lastEventTime - this.maxOutOfOrdernessMs;
state.idle = false;
}
computeGlobalWatermark(): number {
const now = Date.now();
let globalWm = Infinity;
for (const [partitionId, state] of this.partitions) {
// Check if partition is idle
if (now - state.lastActivityProcessingTime > this.idleTimeoutMs) {
if (!state.idle) {
console.log(`Partition ${partitionId} marked as idle`);
state.idle = true;
}
continue; // Exclude idle partitions from watermark computation
}
globalWm = Math.min(globalWm, state.watermark);
}
return globalWm === Infinity ? -Infinity : globalWm;
}
}Watermark Delay Analysis
Quantifying Watermark Lag
Watermark lag measures how far behind real-time the watermark is:
For bounded out-of-orderness with max delay
Impact on End-to-End Latency
Total latency from event occurrence to result emission:
Where:
: time from event to source : bounded out-of-orderness parameter : remaining time until window closes : computation time
TIP
The watermark delay
Choosing the Right Bounded Delay
Analyze the empirical distribution of event-time skew:
class SkewAnalyzer {
private skewValues: number[] = [];
recordSkew(eventTime: number, processingTime: number): void {
this.skewValues.push(processingTime - eventTime);
}
getPercentile(p: number): number {
const sorted = [...this.skewValues].sort((a, b) => a - b);
const index = Math.ceil((p / 100) * sorted.length) - 1;
return sorted[Math.max(0, index)];
}
recommend(): {
conservative: number;
balanced: number;
aggressive: number;
} {
return {
conservative: this.getPercentile(99.9), // Lose 0.1% of events
balanced: this.getPercentile(99), // Lose 1% of events
aggressive: this.getPercentile(95), // Lose 5% of events
};
}
printDistribution(): void {
const percentiles = [50, 75, 90, 95, 99, 99.5, 99.9];
console.log('Event-time skew distribution:');
for (const p of percentiles) {
console.log(` p${p}: ${this.getPercentile(p)}ms`);
}
}
}Example real-world skew distributions:
| Source Type | p50 | p95 | p99 | p99.9 |
|---|---|---|---|---|
| Server logs (same DC) | 10ms | 50ms | 200ms | 1s |
| Mobile app events | 500ms | 5s | 30s | 5min |
| IoT sensors (cellular) | 1s | 10s | 2min | 1hr |
| Cross-region replication | 50ms | 200ms | 1s | 10s |
Edge Cases & Failure Modes
Watermark Regression
Watermarks must be monotonically non-decreasing. A regression indicates a bug:
class WatermarkValidator {
private lastWatermark: number = -Infinity;
private regressionCount: number = 0;
validate(watermark: number): boolean {
if (watermark < this.lastWatermark) {
this.regressionCount++;
console.error(
`Watermark regression detected! ` +
`Current: ${watermark}, Previous: ${this.lastWatermark}, ` +
`Regression count: ${this.regressionCount}`,
);
return false;
}
this.lastWatermark = watermark;
return true;
}
}Common causes of watermark regression:
- Source rebalancing (Kafka consumer group rebalance)
- Checkpoint restore with inconsistent state
- Clock synchronization issues (NTP jumps)
The Stale Watermark Problem
If no events arrive, the watermark stalls. This prevents windows from firing even though real time is advancing:
Solutions:
- Idle timeouts: Mark partitions as idle after N seconds of inactivity
- Processing-time timers: Fire windows after a wall-clock deadline even without watermark advancement
- Synthetic events: Inject heartbeat events with current timestamps
Watermark in Multi-Tenant Systems
When multiple logical streams share physical infrastructure, watermarks from one stream can contaminate another:
class IsolatedWatermarkTracker {
private perTenantWatermarks: Map<string, number> = new Map();
updateWatermark(tenantId: string, watermark: number): void {
const current = this.perTenantWatermarks.get(tenantId) ?? -Infinity;
if (watermark > current) {
this.perTenantWatermarks.set(tenantId, watermark);
}
}
getWatermark(tenantId: string): number {
return this.perTenantWatermarks.get(tenantId) ?? -Infinity;
}
// Do NOT compute global min across tenants — that defeats isolation
}DANGER
Never compute a global watermark across tenants. A single slow tenant will freeze all other tenants' windows.
Performance Characteristics
Watermark Emission Frequency
Emitting watermarks too frequently wastes resources (each watermark is broadcast to all downstream operators). Too infrequently increases latency.
Recommended intervals:
| Throughput | Watermark Interval | Rationale |
|---|---|---|
| < 1K events/s | 1000ms | Low volume, latency tolerance |
| 1K-100K events/s | 200ms | Default Flink setting |
| 100K-1M events/s | 100ms | Balance latency/overhead |
| > 1M events/s | 50ms | Minimize window-fire delay |
Overhead per watermark emission:
Each watermark must be sent to all downstream subtasks, creating
Memory Overhead
Watermark tracking requires per-partition state:
Typically 16-32 bytes per partition. For 10,000 partitions: ~320 KB. Negligible.
Mathematical Foundations
Watermark as a Progress Measure
Formally, a watermark is a progress measure in a partially ordered computation:
Completeness vs. Latency Tradeoff
The fundamental tradeoff is captured by:
These are inversely related:
where
The optimal
where
Information-Theoretic Bounds
For a perfect watermark, you need complete information about all in-flight events. The information required is:
This is generally unbounded in open systems, which is why perfect watermarks are only possible in closed systems (bounded sources).
Real-World War Stories
War Story
The Midnight Watermark Cliff
An e-commerce company processing clickstream data noticed that every night at midnight, their real-time dashboards would freeze for 10-15 minutes. Investigation revealed:
- Their watermark delay was set to 30 seconds
- At midnight, traffic dropped 95%, but a few automated systems still generated events
- One system had a clock skewed 10 minutes ahead
- Events from this system at "00:10" prevented the watermark from advancing past midnight
- All midnight-boundary windows (daily aggregations) were delayed 10 minutes
Fix: Added per-source watermark tracking with outlier detection. Sources whose timestamps were more than 2 standard deviations from the median were excluded from watermark computation.
War Story
The Kafka Rebalance Watermark Storm
A team running Flink with Kafka sources experienced watermark "storms" during consumer group rebalances:
- Rebalance occurs: consumers stop reading for 5-10 seconds
- After rebalance: consumers resume from committed offsets
- Old events (from before rebalance) flood in with old timestamps
- Watermark cannot advance past the oldest re-read event
- All downstream windows are blocked
Root cause: After rebalance, consumers re-read events already processed. The watermark generator sees these old timestamps as "new" events and refuses to advance.
Fix: Track per-partition high-water marks across rebalances. After rebalance, initialize the watermark generator with the previous partition's watermark, not from the re-read events.
War Story
The IoT Watermark That Traveled Back in Time
An IoT platform ingesting sensor data set a 1-hour bounded delay watermark. One class of sensors stored events in flash memory when offline and uploaded in bulk when reconnecting. A sensor that was offline for 3 weeks uploaded 500,000 events spanning 3 weeks of event time.
The watermark plummeted 3 weeks into the past, causing:
- Every window in the last 3 weeks to "reopen"
- State explosion (all those windows needed to be reconstructed)
- Checkpoint size grew from 2 GB to 200 GB
- Checkpoint timeouts → no checkpoints → no fault tolerance
Fix: Ingestion-time filtering. Events older than 24 hours are routed to a batch processing pipeline instead of the streaming pipeline. The streaming pipeline only handles "recent" data.
Decision Framework
Watermark Strategy Selection
| Scenario | Strategy | Delay Setting |
|---|---|---|
| Controlled sources (internal services) | Bounded out-of-orderness | p99 of observed skew |
| Uncontrolled sources (mobile, IoT) | Bounded + idle detection | p99.9 + idle timeout |
| Sources with embedded watermarks | Punctuation-based | N/A (source provides) |
| No event timestamps available | Processing-time | N/A |
| Mixed sources | Per-source strategy | Varies |
Monitoring Watermark Health
Key metrics to track:
interface WatermarkMetrics {
// Current watermark value
currentWatermark: number;
// Lag: processing time - watermark
watermarkLag: number;
// Rate of watermark advancement (should be ~1 second per second)
watermarkAdvancementRate: number;
// Number of late events (arrived after watermark)
lateEventCount: number;
// Late event ratio
lateEventRatio: number;
// Idle partitions count
idlePartitions: number;
}
class WatermarkMonitor {
private lastWatermark: number = -Infinity;
private lastCheckTime: number = Date.now();
private lateEvents: number = 0;
private totalEvents: number = 0;
recordEvent(eventTime: number, currentWatermark: number): void {
this.totalEvents++;
if (eventTime < currentWatermark) {
this.lateEvents++;
}
}
getMetrics(currentWatermark: number): WatermarkMetrics {
const now = Date.now();
const elapsed = now - this.lastCheckTime;
const advancementRate =
elapsed > 0
? (currentWatermark - this.lastWatermark) / elapsed
: 0;
const metrics: WatermarkMetrics = {
currentWatermark,
watermarkLag: now - currentWatermark,
watermarkAdvancementRate: advancementRate,
lateEventCount: this.lateEvents,
lateEventRatio:
this.totalEvents > 0 ? this.lateEvents / this.totalEvents : 0,
idlePartitions: 0, // populated externally
};
this.lastWatermark = currentWatermark;
this.lastCheckTime = now;
return metrics;
}
}TIP
Set alerts on these thresholds:
- watermarkLag > 5 minutes: Something is blocking watermark advancement
- lateEventRatio > 5%: Your bounded delay is too aggressive
- watermarkAdvancementRate < 0.5: Watermark advancing at less than half real-time speed
- idlePartitions > 20% of total: Possible partition starvation
Advanced Topics
Adaptive Watermarks
Instead of a fixed bounded delay, adapt based on observed patterns:
class AdaptiveWatermarkGenerator {
private skewHistory: number[] = [];
private readonly historySize = 10_000;
private readonly targetCompleteness = 0.99; // 99% completeness
onEvent(eventTime: number, processingTime: number): void {
const skew = processingTime - eventTime;
this.skewHistory.push(skew);
if (this.skewHistory.length > this.historySize) {
this.skewHistory.shift();
}
}
computeAdaptiveDelay(): number {
if (this.skewHistory.length < 100) {
return 30_000; // Default 30s until enough data
}
const sorted = [...this.skewHistory].sort((a, b) => a - b);
const percentileIndex = Math.ceil(
this.targetCompleteness * sorted.length,
) - 1;
return sorted[percentileIndex];
}
generateWatermark(maxTimestampSeen: number): number {
const adaptiveDelay = this.computeAdaptiveDelay();
return maxTimestampSeen - adaptiveDelay;
}
}Multi-Dimensional Watermarks
Some systems require watermarks on multiple dimensions (e.g., event time + ingestion time):
This is an active area of research for systems processing data with multiple time attributes.
Watermark Alignment Across Joins
When joining two streams, each with its own watermark, the joined operator's watermark is:
This means the slower stream dictates join completeness. For interval joins:
State retention for the join must account for the watermark of the other stream:
class IntervalJoinWatermarkManager {
private leftWatermark: number = -Infinity;
private rightWatermark: number = -Infinity;
constructor(private readonly intervalMs: number) {}
updateLeftWatermark(wm: number): void {
this.leftWatermark = wm;
}
updateRightWatermark(wm: number): void {
this.rightWatermark = wm;
}
/**
* Can we safely evict a left-side element with this timestamp?
* Only if no future right-side element can join with it.
*/
canEvictLeft(leftTimestamp: number): boolean {
return this.rightWatermark > leftTimestamp + this.intervalMs;
}
canEvictRight(rightTimestamp: number): boolean {
return this.leftWatermark > rightTimestamp + this.intervalMs;
}
getOutputWatermark(): number {
return Math.min(this.leftWatermark, this.rightWatermark);
}
}Source-Specific Watermark Strategies
Kafka
Kafka provides per-partition ordering. The watermark for a Kafka source:
where
Database CDC (Change Data Capture)
For CDC sources (Debezium), the watermark is based on the transaction log position:
Transaction ordering guarantees make CDC watermarks more reliable than general-purpose ones.
File Sources (Bounded)
For bounded sources, perfect watermarks are possible:
Cross-References
- Windowing — How watermarks trigger window computations
- Exactly-Once Processing — Watermark consistency during checkpointing
- State Management — Watermark state in distributed operators
- Backpressure — How backpressure affects watermark propagation
- CDC Patterns — Watermarks for CDC sources
Key Takeaway
- A watermark is a monotonically non-decreasing assertion: "No more events with timestamp less than W will arrive" -- it drives window completion in event-time processing.
- The bounded out-of-orderness delay
dis the single most impactful tuning parameter: too low causes data loss, too high causes unnecessary latency. - Idle partitions can stall the global watermark; always configure idle source timeouts to exclude stalled partitions from watermark computation.
Exercise
Diagnose a Watermark Problem
Your Flink streaming pipeline processes clickstream data from 12 Kafka partitions. The operations team reports that real-time dashboards freeze for 10-15 minutes every night around midnight. Your bounded out-of-orderness is set to 30 seconds.
Given these metrics:
- Partition 0-10: watermark advancing normally (~10:04 PM)
- Partition 11: watermark stuck at 9:50 PM
- Traffic drops 95% at midnight but does not go to zero
- One automated bot system continues generating events
Diagnose the root cause and propose a fix.
Solution
Root Cause: Partition 11 receives events from a bot system with a clock skewed 10+ minutes ahead. At midnight, traffic drops and partition 11's skewed events dominate. The global watermark = min(all partitions) = partition 11's watermark, which is stuck 10-15 minutes behind real time.
Evidence: The watermark formula W = min(all partition watermarks) means a single slow partition holds back all windows across the entire pipeline.
Fix (layered approach):
- Immediate: Add per-partition idle timeout (60 seconds). If partition 11 stops producing events, it is excluded from the global watermark computation.
- Better: Implement per-source watermark tracking with outlier detection. Sources whose timestamps are more than 2 standard deviations from the median are excluded from watermark computation.
- Root cause: Fix the bot system's clock synchronization (NTP). Skewed clocks in source systems are a common source of watermark issues.
- Monitoring: Add alerts for
watermarkLag > 5 minutesandwatermarkAdvancementRate < 0.5.
Common Misconceptions
- "Watermarks guarantee completeness." Most production watermarks are heuristic, not perfect. Late data can and will arrive after the watermark. You MUST handle late data even with watermarks.
- "Setting a larger bounded delay makes the pipeline more correct." A larger delay improves completeness but increases latency proportionally. The right value is application-specific -- profile your actual data's skew distribution.
- "Watermarks are per-event." Watermarks are emitted periodically (e.g., every 200ms), not per event. Emitting per-event would create enormous overhead.
- "A stalled watermark means the pipeline is broken." It often means an idle partition is holding back the global watermark. This is an infrastructure issue, not a pipeline bug.
- "Processing-time watermarks are good enough." They provide no ordering guarantees and make event-time windowing meaningless. Use processing-time only when event timestamps are unavailable.
In Production
- Uber uses adaptive watermarks for their trip data pipeline, automatically adjusting the bounded delay based on observed p99 event-time skew across geographic regions (higher delays for regions with poor connectivity).
- Netflix monitors watermark lag as a top-level SLI for all streaming pipelines, with alerts at 5-minute lag and automatic escalation to PagerDuty at 15-minute lag.
- LinkedIn implements per-source watermark isolation for their multi-tenant streaming platform so that one slow data producer cannot freeze windows for all other tenants.
- Spotify uses punctuation-based watermarks for their internal event bus where producers embed explicit watermark markers, providing more precise completeness signals than heuristic approaches.
Quiz
1. What is the formal definition of a watermark?
A) The maximum event timestamp seen so far B) A monotonically non-decreasing function asserting that no future events will have timestamps below the watermark value C) The processing time minus a fixed delay D) The average event time across all partitions
Answer
B) A watermark W(t_p) at processing time t_p asserts that all events arriving after t_p will have event times >= W(t_p). It is monotonically non-decreasing (can only advance forward).
2. In the bounded out-of-orderness strategy, how is the watermark computed?
A) W = current processing time B) W = max(event timestamps seen) - bounded delay d C) W = min(event timestamps seen) D) W = average(event timestamps seen) - d
Answer
B) The watermark equals the maximum event timestamp observed so far minus the configured bounded delay d. This accounts for events that are up to d time units out of order.
3. How does a multi-input operator compute its output watermark?
A) Average of all input watermarks B) Maximum of all input watermarks C) Minimum of all input watermarks D) The watermark of the fastest input
Answer
C) The output watermark is the minimum of all input watermarks: W_out = min(W_input1, W_input2, ...). This ensures the output watermark never advances past what ALL inputs can guarantee.
4. Why do idle partitions cause watermark problems?
A) They consume too much memory B) Their frozen watermark becomes the global minimum, preventing all windows from firing C) They generate too many events D) They cause checkpoint failures
Answer
B) Since the global watermark = min(all partition watermarks), a partition that stops producing events has a frozen watermark. This becomes the global minimum, blocking all downstream windows from firing even though other partitions have advanced well past it.
5. What is the tradeoff between completeness and latency in watermark configuration?
A) Higher bounded delay increases both completeness and latency; lower delay decreases both B) Higher bounded delay increases completeness but decreases latency C) There is no tradeoff -- you can have both D) Completeness and latency are independent
Answer
A) Increasing the bounded delay d captures more late events (higher completeness) but makes windows fire later (higher latency). Decreasing d reduces latency but risks missing late events. The optimal d depends on your application's tolerance for each.
:::
One-Liner Summary: Watermarks are the streaming system's best guess at "all data before this time has arrived" -- tune the bounded delay by profiling your actual event-time skew distribution.