Derived Data: Batch and Stream Processing
SummaryThis section introduces the core paradigms of derived...
This section introduces the core paradigms of derived...
This section introduces the core paradigms of derived data systems: batch and stream processing. Batch processing handles large, bounded datasets with high latency but high throughput, exemplified by a deterministic job that reads immutable click logs to generate a daily page view summary. Stream processing handles unbounded, continuous data with low latency, demonstrated by a stateful processor that maintains an aggregate view using checkpoints for idempotency. The integration of these paradigms is explored through two architectural patterns: Lambda Architecture (combining batch and speed layers) and Kappa Architecture (using a single stream processing layer with replay). Key concepts include the immutable log as a foundational data structure, the deterministic and idempotent nature of processing, and the trade-offs between latency, throughput, and operational complexity. The section establishes that derived data systems shift complexity from the read path to the write path, accepting eventual consistency for performance and scalability.
Derived Data: Batch and Stream Processing
In the realm of data processing, two fundamental paradigms have emerged: batch processing and stream processing. Each embodies an immutable trade-off in system design, reflecting a fundamental tension between latency, throughput, and complexity. The trade-off between latency and throughput is immutable: batch offers high throughput at the cost of staleness; stream offers low latency at the cost of higher resource consumption per event. The evolution of data systems has led to architectures that combine these paradigms not as a compromise, but as a necessary synthesis to satisfy divergent access patterns. This section establishes the invariants of derived data systems, the mechanisms that enforce them, and the architectural patterns that operationalize these principles for both real-time and historical analysis.
Batch Processing
Invariant: Correctness is achieved through deterministic recomputation over immutable data.
Batch processing is a paradigm where computations are performed over large, bounded datasets in discrete, scheduled jobs. It is characterized by high latency (minutes to hours) but high throughput and efficiency. Batch jobs are the canonical mechanism for producing authoritative, complete views of data, used for analytics, reporting, and data integration tasks where processing time is secondary to accuracy and completeness. The batch processing model is required for tasks that must process entire datasets, such as data warehousing, business intelligence, and machine learning model training.
Fault tolerance is achieved by the ability to recompute the entire view deterministically from the immutable source logs, treating any intermediate failure as a transient state. Recovery is not a special case—it is the expected path. A failed job is not repaired; it is rerun. The system assumes failure as the default state and guarantees consistency by reapplying the same logic to the same inputs.
Example: Batch Recomputation Job
A batch recomputation job implements the ‘batch layer’ of Lambda Architecture, processing immutable input files and producing a derived materialized view. The job reads the entire input dataset, applies transformations, and writes the output to a serving store. This process is deterministic: reprocessing the same input yields the same output, enforcing consistency and reliability as a direct consequence of immutability and idempotence.
import json
from pathlib import Path
from dataclasses import dataclass
from datetime import date
from collections import defaultdict
@dataclass(frozen=True)
class ClickEvent:
user_id: str
page_id: str
timestamp: str # ISO format
# ... other fields
@dataclass
class PageViewSummary:
page_id: str
date: date
total_views: int
unique_users: int
class BatchViewGenerator:
"""Generates a daily page view summary from immutable raw click logs."""
def __init__(self, input_dir: Path, output_dir: Path):
self.input_dir = input_dir
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
def run(self) -> None:
"""Process all input files deterministically."""
# 1. Read immutable inputs
all_events = self._read_input_files()
# 2. Transform: Group by page and date
grouped = defaultdict(lambda: defaultdict(set)) # page_id -> date -> set(user_id)
for event in all_events:
event_date = date.fromisoformat(event.timestamp[:10])
grouped[event.page_id][event_date].add(event.user_id)
# 3. Produce derived materialized view
summaries = []
for page_id, date_map in grouped.items():
for event_date, user_set in date_map.items():
summary = PageViewSummary(
page_id=page_id,
date=event_date,
total_views=sum(1 for e in all_events if e.page_id == page_id and date.fromisoformat(e.timestamp[:10]) == event_date),
unique_users=len(user_set)
)
summaries.append(summary)
# 4. Write output (overwrites previous view)
output_path = self.output_dir / "page_view_summary.json"
with open(output_path, 'w') as f:
json.dump([s.__dict__ for s in summaries], f, indent=2, default=str)
print(f"Batch view generated at {output_path}")
def _read_input_files(self) -> list[ClickEvent]:
"""Read all .jsonl files from input directory."""
events = []
for file_path in self.input_dir.glob("*.jsonl"):
with open(file_path) as f:
for line in f:
data = json.loads(line)
events.append(ClickEvent(**data))
return events
# Example usage
if __name__ == "__main__":
# Assume immutable log files are dumped here daily
input_path = Path("/data/raw_clicks")
output_path = Path("/data/derived_views/batch")
generator = BatchViewGenerator(input_path, output_path)
generator.run()
# This job can be re-run anytime with the same inputs, producing identical outputs (idempotent).
Stream Processing
Invariant: Real-time correctness is maintained through stateful, idempotent processing with durable checkpoints.
Stream processing is a paradigm where computations are performed over unbounded, continuous data streams as they arrive. It is characterized by low latency (milliseconds to seconds) and incremental processing. Stream processing is the only viable mechanism for real-time monitoring, alerts, and applications requiring immediate responses to events, such as financial trading platforms, IoT sensor data processing, and social media analytics.
The system assumes failure is the default state. Idempotency is not optional—it is enforced by checkpointing the last processed offset and state. On recovery, the processor restores its state from the last durable checkpoint and resumes processing from the next offset, ensuring exactly-once semantics through deterministic replay.
Example: Stateful Stream Processor
A stateful stream processor maintains a derived aggregate view by processing events from an immutable log. The processor ensures idempotency by skipping events already processed according to its checkpoint. It updates its state durably and writes the derived view to a serving store.
import asyncio
from dataclasses import dataclass, field
from typing import Dict, List
from datetime import datetime
import json
@dataclass
class Event:
key: str
value: int
offset: int # Immutable log position
timestamp: datetime
@dataclass
class Checkpoint:
last_processed_offset: int
state_snapshot: Dict[str, int]
class StatefulStreamProcessor:
"""A simplified stream processor maintaining a derived aggregate view."""
def __init__(self):
self.state: Dict[str, int] = {} # Derived materialized view: key -> sum
self.last_checkpoint: Checkpoint | None = None
async def process_events(self, event_batch: List[Event]) -> None:
"""Process a batch of events. Must be idempotent."""
# Ensure idempotency: skip events already processed according to checkpoint.
start_offset = self.last_checkpoint.last_processed_offset if self.last_checkpoint else -1
events_to_process = [e for e in event_batch if e.offset > start_offset]
for event in events_to_process:
# Deterministic state update
self.state[event.key] = self.state.get(event.key, 0) + event.value
print(f"Processed offset {event.offset}: {event.key} += {event.value} -> {self.state[event.key]}")
# Simulate idempotent write to output sink (e.g., key-value store)
await self._update_materialized_view()
async def _update_materialized_view(self) -> None:
"""Idempotent write of current state to derived data store."""
# In reality, this would be a conditional put or upsert operation.
print(f"Materialized View Updated: {self.state}")
def create_checkpoint(self) -> Checkpoint:
"""Create a checkpoint for fault tolerance."""
# In a real system, this state would be persisted durably.
max_offset = max([e.offset for e in self._get_last_batch()], default=-1)
return Checkpoint(last_processed_offset=max_offset, state_snapshot=self.state.copy())
def restore_from_checkpoint(self, checkpoint: Checkpoint) -> None:
"""Restore state after failure. Ensures exactly-once semantics."""
self.state = checkpoint.state_snapshot.copy()
self.last_checkpoint = checkpoint
print(f"Restored from checkpoint at offset {checkpoint.last_processed_offset}")
def _get_last_batch(self) -> List[Event]:
# Mock method returning last processed batch
return []
# Example usage
async def main():
processor = StatefulStreamProcessor()
# Simulate a stream of events from an immutable log (e.g., Kafka)
events = [
Event(key="user_a", value=10, offset=1, timestamp=datetime.now()),
Event(key="user_b", value=5, offset=2, timestamp=datetime.now()),
Event(key="user_a", value=3, offset=3, timestamp=datetime.now()), # Duplicate in replay
Event(key="user_a", value=3, offset=3, timestamp=datetime.now()), # Simulated duplicate
]
await processor.process_events(events)
checkpoint = processor.create_checkpoint()
print(f"Checkpoint created: {checkpoint}")
# Simulate failure and restore
new_processor = StatefulStreamProcessor()
new_processor.restore_from_checkpoint(checkpoint)
# Replay the same events - due to idempotency, state should not double-count offset 3.
await new_processor.process_events(events)
if __name__ == "__main__":
asyncio.run(main())
Integration of Batch and Stream Processing
The integration of batch and stream processing is not optional for systems requiring both accuracy and responsiveness. Two architectural patterns dominate: Lambda and Kappa. Both are built on the invariant that the immutable log is the source of truth.
Lambda Architecture
The Lambda Architecture enforces the invariant that the batch view is the source of truth. It combines a batch layer, which computes a complete and accurate view, with a speed layer, which provides low-latency, approximate results. The serving layer reconciles queries by returning the batch view, augmented with real-time deltas from the speed layer. The architecture guarantees correctness by design: during reconciliation, the batch view overrides the real-time view, ensuring that approximations are temporary and bounded.
Kappa Architecture
The Kappa Architecture enforces the invariant that all processing is stream processing. It eliminates the batch layer by replaying the entire immutable log through a single, stateful stream processor. Historical analysis is achieved by replaying the stream from the beginning. This simplifies the system by removing dual code paths but demands a stream processor capable of handling large-scale backfills. The trade-off is operational complexity for architectural uniformity.
Conclusion
Derived data systems are defined by invariants, not components. The immutable log is the unifying foundation: it enables deterministic recomputation in batch, idempotent replay in stream, and fault tolerance by design. The trade-off between latency and throughput is not a flaw to be engineered around—it is a law of distributed systems. Batch and stream processing are not alternatives; they are complementary mechanisms for projecting derived views from the same truth. Lambda and Kappa are not competing solutions, but different resolutions of the same trade-off: dualism versus uniformity. The choice between them is dictated not by technology, but by the cost of approximation and the operational capacity to manage complexity. Systems must be built assuming failure; correctness must be provable through determinism and immutability.