Stream Processing Semantics
SummaryThis section establishes the core semantics of stream...
This section establishes the core semantics of stream...
This section establishes the core semantics of stream processing, focusing on the mechanisms and trade-offs for handling unbounded data. It defines **windowing** as the technique to group events into finite sets: tumbling (non-overlapping), sliding (overlapping), and session (activity-based) windows. **Watermarks** are introduced as monotonic timestamps that signal progress and handle out-of-order data, with periodic and punctuated generation strategies. The critical distinction between **processing time** (system clock, low-latency, non-deterministic) and **event time** (embedded timestamp, deterministic, complex) is explained, highlighting the inherent latency-completeness trade-off. The section integrates **late data handling** via an 'allowed lateness' grace period and demonstrates **micro-batching** as an execution model that trades latency for throughput and simpler recovery. Code examples illustrate a tumbling window assigner, periodic watermark generator, and an event-time window manager that processes events, checks for lateness, and triggers windows based on watermarks. Key concepts like **exactly-once semantics** (via checkpointing/idempotent writes) and **stateful processing** are also covered. The narrative consistently frames choices, such as event vs. processing time or micro-batching vs. pure streaming, as immutable trade-offs between latency, completeness, and complexity.
Stream Processing Semantics
Stream processing is a paradigm that enables the analysis of real-time data streams. It is characterized by low latency, high throughput, and the ability to handle unbounded, continuous data. The core of stream processing lies in its ability to handle data in motion, providing insights into events as they unfold. This section delves into the fundamental semantics of stream processing, focusing on windowing, watermarks, and the distinction between processing time and event time.
Windowing
Windowing is a technique used in stream processing to divide the stream into finite, bounded sets, called windows, for aggregation or computation. Windowing embodies the completeness-latency trade-off: larger windows or waiting for watermarks increase completeness but add latency; smaller windows or early triggers reduce latency at the risk of incomplete results. Windows can be categorized into three main types: tumbling, sliding, and session windows.
-
Tumbling Windows: These are non-overlapping, fixed-size windows. Each event belongs to exactly one window, and the windows do not overlap with each other. Tumbling windows are useful for aggregations that require a fixed time interval, such as calculating the number of events per minute.
-
Sliding Windows: These windows have a fixed size and a slide interval that is smaller than the window size. This means that events can belong to multiple windows, and there is an overlap between consecutive windows. Sliding windows are useful for detecting patterns or trends over a moving window of time.
-
Session Windows: These are dynamic windows defined by periods of activity separated by gaps of inactivity. Session windows are useful for analyzing user sessions or other types of activity that have a clear start and end.
Example: Tumbling Window
The following Python code snippet demonstrates a simple tumbling window assignment using the TumblingWindowAssigner class:
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass(frozen=True)
class StreamEvent:
key: str
value: int
event_time: datetime
class TumblingWindowAssigner:
def __init__(self, window_size: timedelta):
self.window_size = window_size
def assign(self, event: StreamEvent) -> datetime:
epoch_start = datetime.min
delta = event.event_time - epoch_start
window_num = delta // self.window_size
window_start = epoch_start + (window_num * self.window_size)
return window_start
# Example usage
assigner = TumblingWindowAssigner(window_size=timedelta(minutes=5))
event = StreamEvent("A", 1, datetime(2023, 1, 1, 10, 0))
window_start = assigner.assign(event)
print(f"Event assigned to window starting at {window_start}")
Watermarks
A watermark is a monotonically increasing timestamp that signifies that no events with a timestamp less than the watermark are expected. Watermarks are a heuristic mechanism for managing the latency-completeness trade-off: being wrong—allowing late data—is an inherent and expected outcome of watermark design. Watermarks are used to handle out-of-order data and define when a window can be considered complete for processing. There are two types of watermarks: periodic and punctuated.
-
Periodic Watermarks: These are generated at regular intervals based on the maximum event time seen so far, minus a maximum allowed out-of-order delay. Periodic watermarks are simple to implement but may not accurately reflect the progress of the stream.
-
Punctuated Watermarks: These are generated based on special markers or events in the stream that indicate the end of a window or a specific point in time. Punctuated watermarks provide more accurate information about the stream’s progress but require additional infrastructure to generate and propagate the markers.
Example: Periodic Watermark Generation
The following Python code snippet demonstrates a simple periodic watermark generator using the PeriodicWatermarkGenerator class:
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class Watermark:
timestamp: datetime
class PeriodicWatermarkGenerator:
def __init__(self, max_out_of_order: timedelta):
self.max_out_of_order = max_out_of_order
self.max_event_time_seen: datetime | None = None
def observe_event(self, event: StreamEvent) -> None:
if self.max_event_time_seen is None or event.event_time > self.max_event_time_seen:
self.max_event_time_seen = event.event_time
def get_current_watermark(self) -> Watermark | None:
if self.max_event_time_seen is None:
return None
wm_time = self.max_event_time_seen - self.max_out_of_order
return Watermark(timestamp=wm_time)
# Example usage
watermark_gen = PeriodicWatermarkGenerator(max_out_of_order=timedelta(minutes=2))
watermark_gen.observe_event(StreamEvent("A", 1, datetime(2023, 1, 1, 10, 0)))
watermark = watermark_gen.get_current_watermark()
print(f"Current watermark: {watermark.timestamp}")
Processing Time vs. Event Time
Stream processing can be based on either processing time or event time. The choice is an immutable trade-off: processing time offers low latency and simplicity but sacrifices determinism; event time provides determinism and reproducibility at the cost of latency and complexity for handling late data.
-
Processing Time: This refers to the time at which an event is observed by the stream processing system, based on the system’s local clock. Processing time is simple to implement and provides low latency but is non-deterministic and not reproducible.
-
Event Time: This refers to the time at which an event actually occurred, typically embedded within the event data itself. Event time provides deterministic and reproducible results but requires handling out-of-order data and late arrivals, introducing complexity and latency.
Example: Event Time Processing
The following Python code snippet demonstrates a simple event-time window manager using the EventTimeWindowManager class:
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List
@dataclass
class StreamEvent:
key: str
value: int
event_time: datetime
class EventTimeWindowManager:
def __init__(self, window_assigner, allowed_lateness: timedelta):
self.window_assigner = window_assigner
self.allowed_lateness = allowed_lateness
self.windows = {} # window_start -> list of events
self.window_results = {} # window_start -> aggregated result
self.triggered_windows = set() # Windows that have already been emitted
def process_event(self, event: StreamEvent, watermark: Watermark | None) -> List[str]:
actions = []
window_start = self.window_assigner.assign(event)
window_end = window_start + self.window_assigner.window_size
# 1. Check if event is late relative to watermark
is_late = False
if watermark is not None and event.event_time < watermark.timestamp:
is_late = True
actions.append(f"Event for window {window_start} is LATE relative to watermark {watermark.timestamp}")
# 2. Add event to window state
if window_start not in self.windows:
self.windows[window_start] = []
self.windows[window_start].append(event)
# 3. Trigger window if watermark passed its end + allowed lateness
if watermark is not None:
window_close_time = window_end + self.allowed_lateness
if watermark.timestamp >= window_close_time and window_start not in self.triggered_windows:
result = self._aggregate_window(window_start)
self.window_results[window_start] = result
self.triggered_windows.add(window_start)
actions.append(f"TRIGGERED window {window_start} with result {result}. Watermark {watermark.timestamp} >= close time {window_close_time}")
# Optional: Garbage collect state after triggering
# self.windows.pop(window_start, None)
# 4. If event is late but window not yet closed, update result
if is_late and window_start in self.window_results:
# Re-aggregate with the new late event
new_result = self._aggregate_window(window_start)
self.window_results[window_start] = new_result
actions.append(f"UPDATED window {window_start} with late data. New result: {new_result}")
return actions
def _aggregate_window(self, window_start: datetime) -> int:
# Simple aggregation: sum of values in the window
return sum(e.value for e in self.windows.get(window_start, []))
# Example usage
window_assigner = TumblingWindowAssigner(window_size=timedelta(minutes=5))
window_manager = EventTimeWindowManager(window_assigner, allowed_lateness=timedelta(minutes=1))
window_manager.process_event(StreamEvent("A", 1, datetime(2023, 1, 1, 10, 0)), Watermark(timestamp=datetime(2023, 1, 1, 10, 5)))
Micro-batching
Micro-batching is an execution model that trades off latency (higher than pure streaming) for throughput, simpler fault recovery via batch recomputation, and easier integration with batch frameworks.
Example: Micro-batch Simulation
The following Python code snippet demonstrates a simple micro-batch simulation using the micro_batch_processing function:
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List
@dataclass
class StreamEvent:
key: str
value: int
event_time: datetime
def micro_batch_processing(events_batch: List[StreamEvent], watermark_gen: PeriodicWatermarkGenerator) -> dict:
# 1. Update watermark generator with batch events
for event in events_batch:
watermark_gen.observe_event(event)
# 2. Get current watermark for this batch
current_watermark = watermark_gen.get_current_watermark()
# 3. Process each event (simulating parallel processing)
window_mgr = EventTimeWindowManager(
window_assigner=TumblingWindowAssigner(window_size=timedelta(minutes=5)),
allowed_lateness=timedelta(minutes=1)
)
all_actions = []
for event in events_batch:
actions = window_mgr.process_event(event, current_watermark)
all_actions.extend(actions)
return {
"batch_size": len(events_batch),
"watermark": current_watermark.timestamp if current_watermark else None,
"actions": all_actions,
"final_window_results": window_mgr.window_results
}
# Example usage
watermark_gen = PeriodicWatermarkGenerator(max_out_of_order=timedelta(minutes=2))
batch_results = micro_batch_processing([StreamEvent("A", 1, datetime(2023, 1, 1, 10, 0))], watermark_gen)
print(batch_results)
Exactly-Once Semantics
Exactly-once semantics in streams ensure that each event affects the output exactly once, even in the face of failures. Exactly-once semantics via checkpointing provides strong correctness guarantees but introduces latency amplification during the blocking checkpoint operation and increases system complexity.
Example: Checkpointing
The following Python code snippet demonstrates a simple checkpointing mechanism using the Checkpoint class:
from dataclasses import dataclass
from datetime import datetime
from typing import Dict
@dataclass
class Checkpoint:
last_processed_offset: int
state_snapshot: Dict[str, int]
class StatefulStreamProcessor:
def __init__(self):
self.state: Dict[str, int] = {} # Derived materialized view: key -> sum
self.last_checkpoint: Checkpoint | None = None
def process_events(self, event_batch: List[StreamEvent]) -> None:
# Ensure idempotency: skip events already processed according to checkpoint.
start_offset = self.last_checkpoint.last_processed_offset if self.last_checkpoint else -1
events_to_process = [e for e in event_batch if e.offset > start_offset]
for event in events_to_process:
# Deterministic state update
self.state[event.key] = self.state.get(event.key, 0) + event.value
print(f"Processed offset {event.offset}: {event.key} += {event.value} -> {self.state[event.key]}")
# Simulate idempotent write to output sink (e.g., key-value store)
self._update_materialized_view()
def _update_materialized_view(self) -> None:
# Idempotent write logic
pass
Stateful Stream Processing
Stateful stream processing refers to the ability of stream processors to maintain mutable state across events. Stateful processing enables complex operations but introduces the cost of managing and checkpointing mutable state, which can grow unbounded and requires careful garbage collection. This is essential for complex aggregations, joins, and other operations that require knowledge of previous events.
Example: Stateful Stream Processor
The following Python code snippet demonstrates a simple stateful stream processor using the StatefulStreamProcessor class:
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List
@dataclass
class StreamEvent:
key: str
value: int
offset: int
timestamp: datetime
class StatefulStreamProcessor:
def __init__(self):
self.state: Dict[str, int] = {} # Derived materialized view: key -> sum
self.last_checkpoint: Checkpoint | None = None
def process_events(self, event_batch: List[StreamEvent]) -> None:
# Ensure idempotency: skip events already processed according to checkpoint.
start_offset = self.last_checkpoint.last_processed_offset if self.last_checkpoint else -1
events_to_process = [e for e in event_batch if e.offset > start_offset]
for event in events_to_process:
# Deterministic state update
self.state[event.key] = self.state.get(event.key, 0) + event.value
print(f"Processed offset {event.offset}: {event.key} += {event.value} -> {self.state[event.key]}")
# Simulate idempotent write to output sink (e.g., key-value store)
self._update_materialized_view()
def _update_materialized_view(self) -> None:
# Idempotent write logic
pass
Sources
- [1] Apache Flink Documentation. (n.d.). Window Functions. Retrieved from https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/windows/
- [2] Apache Spark Documentation. (n.d.). Window Functions. Retrieved from https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html#window