Skip to main content
data systems mechanics invariants in distributed architectures

Stream Processing Semantics

9 min read Chapter 23 of 28
Summary

This section establishes the core semantics of stream...

This section establishes the core semantics of stream processing, focusing on the mechanisms and trade-offs for handling unbounded data. It defines **windowing** as the technique to group events into finite sets: tumbling (non-overlapping), sliding (overlapping), and session (activity-based) windows. **Watermarks** are introduced as monotonic timestamps that signal progress and handle out-of-order data, with periodic and punctuated generation strategies. The critical distinction between **processing time** (system clock, low-latency, non-deterministic) and **event time** (embedded timestamp, deterministic, complex) is explained, highlighting the inherent latency-completeness trade-off. The section integrates **late data handling** via an 'allowed lateness' grace period and demonstrates **micro-batching** as an execution model that trades latency for throughput and simpler recovery. Code examples illustrate a tumbling window assigner, periodic watermark generator, and an event-time window manager that processes events, checks for lateness, and triggers windows based on watermarks. Key concepts like **exactly-once semantics** (via checkpointing/idempotent writes) and **stateful processing** are also covered. The narrative consistently frames choices, such as event vs. processing time or micro-batching vs. pure streaming, as immutable trade-offs between latency, completeness, and complexity.

Stream Processing Semantics

Stream processing is a paradigm that enables the analysis of real-time data streams. It is characterized by low latency, high throughput, and the ability to handle unbounded, continuous data. The core of stream processing lies in its ability to handle data in motion, providing insights into events as they unfold. This section delves into the fundamental semantics of stream processing, focusing on windowing, watermarks, and the distinction between processing time and event time.

Windowing

Windowing is a technique used in stream processing to divide the stream into finite, bounded sets, called windows, for aggregation or computation. Windowing embodies the completeness-latency trade-off: larger windows or waiting for watermarks increase completeness but add latency; smaller windows or early triggers reduce latency at the risk of incomplete results. Windows can be categorized into three main types: tumbling, sliding, and session windows.

  • Tumbling Windows: These are non-overlapping, fixed-size windows. Each event belongs to exactly one window, and the windows do not overlap with each other. Tumbling windows are useful for aggregations that require a fixed time interval, such as calculating the number of events per minute.

  • Sliding Windows: These windows have a fixed size and a slide interval that is smaller than the window size. This means that events can belong to multiple windows, and there is an overlap between consecutive windows. Sliding windows are useful for detecting patterns or trends over a moving window of time.

  • Session Windows: These are dynamic windows defined by periods of activity separated by gaps of inactivity. Session windows are useful for analyzing user sessions or other types of activity that have a clear start and end.

Example: Tumbling Window

The following Python code snippet demonstrates a simple tumbling window assignment using the TumblingWindowAssigner class:

from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass(frozen=True)
class StreamEvent:
    key: str
    value: int
    event_time: datetime

class TumblingWindowAssigner:
    def __init__(self, window_size: timedelta):
        self.window_size = window_size
    
    def assign(self, event: StreamEvent) -> datetime:
        epoch_start = datetime.min
        delta = event.event_time - epoch_start
        window_num = delta // self.window_size
        window_start = epoch_start + (window_num * self.window_size)
        return window_start

# Example usage
assigner = TumblingWindowAssigner(window_size=timedelta(minutes=5))
event = StreamEvent("A", 1, datetime(2023, 1, 1, 10, 0))
window_start = assigner.assign(event)
print(f"Event assigned to window starting at {window_start}")

Watermarks

A watermark is a monotonically increasing timestamp that signifies that no events with a timestamp less than the watermark are expected. Watermarks are a heuristic mechanism for managing the latency-completeness trade-off: being wrong—allowing late data—is an inherent and expected outcome of watermark design. Watermarks are used to handle out-of-order data and define when a window can be considered complete for processing. There are two types of watermarks: periodic and punctuated.

  • Periodic Watermarks: These are generated at regular intervals based on the maximum event time seen so far, minus a maximum allowed out-of-order delay. Periodic watermarks are simple to implement but may not accurately reflect the progress of the stream.

  • Punctuated Watermarks: These are generated based on special markers or events in the stream that indicate the end of a window or a specific point in time. Punctuated watermarks provide more accurate information about the stream’s progress but require additional infrastructure to generate and propagate the markers.

Example: Periodic Watermark Generation

The following Python code snippet demonstrates a simple periodic watermark generator using the PeriodicWatermarkGenerator class:

from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class Watermark:
    timestamp: datetime

class PeriodicWatermarkGenerator:
    def __init__(self, max_out_of_order: timedelta):
        self.max_out_of_order = max_out_of_order
        self.max_event_time_seen: datetime | None = None
    
    def observe_event(self, event: StreamEvent) -> None:
        if self.max_event_time_seen is None or event.event_time > self.max_event_time_seen:
            self.max_event_time_seen = event.event_time
    
    def get_current_watermark(self) -> Watermark | None:
        if self.max_event_time_seen is None:
            return None
        wm_time = self.max_event_time_seen - self.max_out_of_order
        return Watermark(timestamp=wm_time)

# Example usage
watermark_gen = PeriodicWatermarkGenerator(max_out_of_order=timedelta(minutes=2))
watermark_gen.observe_event(StreamEvent("A", 1, datetime(2023, 1, 1, 10, 0)))
watermark = watermark_gen.get_current_watermark()
print(f"Current watermark: {watermark.timestamp}")

Processing Time vs. Event Time

Stream processing can be based on either processing time or event time. The choice is an immutable trade-off: processing time offers low latency and simplicity but sacrifices determinism; event time provides determinism and reproducibility at the cost of latency and complexity for handling late data.

  • Processing Time: This refers to the time at which an event is observed by the stream processing system, based on the system’s local clock. Processing time is simple to implement and provides low latency but is non-deterministic and not reproducible.

  • Event Time: This refers to the time at which an event actually occurred, typically embedded within the event data itself. Event time provides deterministic and reproducible results but requires handling out-of-order data and late arrivals, introducing complexity and latency.

Example: Event Time Processing

The following Python code snippet demonstrates a simple event-time window manager using the EventTimeWindowManager class:

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List

@dataclass
class StreamEvent:
    key: str
    value: int
    event_time: datetime

class EventTimeWindowManager:
    def __init__(self, window_assigner, allowed_lateness: timedelta):
        self.window_assigner = window_assigner
        self.allowed_lateness = allowed_lateness
        self.windows = {}  # window_start -> list of events
        self.window_results = {}  # window_start -> aggregated result
        self.triggered_windows = set()  # Windows that have already been emitted
    
    def process_event(self, event: StreamEvent, watermark: Watermark | None) -> List[str]:
        actions = []
        window_start = self.window_assigner.assign(event)
        window_end = window_start + self.window_assigner.window_size
        
        # 1. Check if event is late relative to watermark
        is_late = False
        if watermark is not None and event.event_time < watermark.timestamp:
            is_late = True
            actions.append(f"Event for window {window_start} is LATE relative to watermark {watermark.timestamp}")
        
        # 2. Add event to window state
        if window_start not in self.windows:
            self.windows[window_start] = []
        self.windows[window_start].append(event)
        
        # 3. Trigger window if watermark passed its end + allowed lateness
        if watermark is not None:
            window_close_time = window_end + self.allowed_lateness
            if watermark.timestamp >= window_close_time and window_start not in self.triggered_windows:
                result = self._aggregate_window(window_start)
                self.window_results[window_start] = result
                self.triggered_windows.add(window_start)
                actions.append(f"TRIGGERED window {window_start} with result {result}. Watermark {watermark.timestamp} >= close time {window_close_time}")
                # Optional: Garbage collect state after triggering
                # self.windows.pop(window_start, None)
        
        # 4. If event is late but window not yet closed, update result
        if is_late and window_start in self.window_results:
            # Re-aggregate with the new late event
            new_result = self._aggregate_window(window_start)
            self.window_results[window_start] = new_result
            actions.append(f"UPDATED window {window_start} with late data. New result: {new_result}")
        
        return actions
    
    def _aggregate_window(self, window_start: datetime) -> int:
        # Simple aggregation: sum of values in the window
        return sum(e.value for e in self.windows.get(window_start, []))

# Example usage
window_assigner = TumblingWindowAssigner(window_size=timedelta(minutes=5))
window_manager = EventTimeWindowManager(window_assigner, allowed_lateness=timedelta(minutes=1))
window_manager.process_event(StreamEvent("A", 1, datetime(2023, 1, 1, 10, 0)), Watermark(timestamp=datetime(2023, 1, 1, 10, 5)))

Micro-batching

Micro-batching is an execution model that trades off latency (higher than pure streaming) for throughput, simpler fault recovery via batch recomputation, and easier integration with batch frameworks.

Example: Micro-batch Simulation

The following Python code snippet demonstrates a simple micro-batch simulation using the micro_batch_processing function:

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List

@dataclass
class StreamEvent:
    key: str
    value: int
    event_time: datetime

def micro_batch_processing(events_batch: List[StreamEvent], watermark_gen: PeriodicWatermarkGenerator) -> dict:
    # 1. Update watermark generator with batch events
    for event in events_batch:
        watermark_gen.observe_event(event)
    
    # 2. Get current watermark for this batch
    current_watermark = watermark_gen.get_current_watermark()
    
    # 3. Process each event (simulating parallel processing)
    window_mgr = EventTimeWindowManager(
        window_assigner=TumblingWindowAssigner(window_size=timedelta(minutes=5)),
        allowed_lateness=timedelta(minutes=1)
    )
    all_actions = []
    for event in events_batch:
        actions = window_mgr.process_event(event, current_watermark)
        all_actions.extend(actions)
    
    return {
        "batch_size": len(events_batch),
        "watermark": current_watermark.timestamp if current_watermark else None,
        "actions": all_actions,
        "final_window_results": window_mgr.window_results
    }

# Example usage
watermark_gen = PeriodicWatermarkGenerator(max_out_of_order=timedelta(minutes=2))
batch_results = micro_batch_processing([StreamEvent("A", 1, datetime(2023, 1, 1, 10, 0))], watermark_gen)
print(batch_results)

Exactly-Once Semantics

Exactly-once semantics in streams ensure that each event affects the output exactly once, even in the face of failures. Exactly-once semantics via checkpointing provides strong correctness guarantees but introduces latency amplification during the blocking checkpoint operation and increases system complexity.

Example: Checkpointing

The following Python code snippet demonstrates a simple checkpointing mechanism using the Checkpoint class:

from dataclasses import dataclass
from datetime import datetime
from typing import Dict

@dataclass
class Checkpoint:
    last_processed_offset: int
    state_snapshot: Dict[str, int]

class StatefulStreamProcessor:
    def __init__(self):
        self.state: Dict[str, int] = {}  # Derived materialized view: key -> sum
        self.last_checkpoint: Checkpoint | None = None

    def process_events(self, event_batch: List[StreamEvent]) -> None:
        # Ensure idempotency: skip events already processed according to checkpoint.
        start_offset = self.last_checkpoint.last_processed_offset if self.last_checkpoint else -1
        events_to_process = [e for e in event_batch if e.offset > start_offset]

        for event in events_to_process:
            # Deterministic state update
            self.state[event.key] = self.state.get(event.key, 0) + event.value
            print(f"Processed offset {event.offset}: {event.key} += {event.value} -> {self.state[event.key]}")

        # Simulate idempotent write to output sink (e.g., key-value store)
        self._update_materialized_view()

    def _update_materialized_view(self) -> None:
        # Idempotent write logic
        pass

Stateful Stream Processing

Stateful stream processing refers to the ability of stream processors to maintain mutable state across events. Stateful processing enables complex operations but introduces the cost of managing and checkpointing mutable state, which can grow unbounded and requires careful garbage collection. This is essential for complex aggregations, joins, and other operations that require knowledge of previous events.

Example: Stateful Stream Processor

The following Python code snippet demonstrates a simple stateful stream processor using the StatefulStreamProcessor class:

from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List

@dataclass
class StreamEvent:
    key: str
    value: int
    offset: int
    timestamp: datetime

class StatefulStreamProcessor:
    def __init__(self):
        self.state: Dict[str, int] = {}  # Derived materialized view: key -> sum
        self.last_checkpoint: Checkpoint | None = None

    def process_events(self, event_batch: List[StreamEvent]) -> None:
        # Ensure idempotency: skip events already processed according to checkpoint.
        start_offset = self.last_checkpoint.last_processed_offset if self.last_checkpoint else -1
        events_to_process = [e for e in event_batch if e.offset > start_offset]

        for event in events_to_process:
            # Deterministic state update
            self.state[event.key] = self.state.get(event.key, 0) + event.value
            print(f"Processed offset {event.offset}: {event.key} += {event.value} -> {self.state[event.key]}")

        # Simulate idempotent write to output sink (e.g., key-value store)
        self._update_materialized_view()

    def _update_materialized_view(self) -> None:
        # Idempotent write logic
        pass

Sources