Skip to main content

On This Page

A Coding Implementation to Build a Unified Apache Beam Pipeline Demonstrating Batch and Stream Processing with Event-Time Windowing Using DirectRunner

3 min read
Share

These articles are AI-generated summaries. Please check the original sources for full details.

Apache Beam Unified Pipeline for Batch and Stream Processing

Apache Beam allows developers to build a single pipeline that can execute in both batch and stream processing modes, demonstrated here with event-time windowing; this implementation uses the DirectRunner for local testing and development. The tutorial provides a fully functional example, showcasing consistent handling of on-time and late events.

This is valuable because real-world data pipelines often need to handle both historical (batch) and real-time (stream) data with consistent logic, but traditionally require separate codebases and infrastructure, increasing complexity and maintenance overhead. Failure to reconcile batch and stream processing can lead to inconsistent results or necessitate costly duplication of effort.

Key Insights

  • DirectRunner limitations: The DirectRunner is primarily for local testing and doesn’t fully replicate the behavior of distributed runners like Dataflow or Flink.
  • Event-time vs. Processing-time: Beam’s event-time model uses timestamps embedded within the data, providing accurate results even with out-of-order data, unlike processing-time which relies on the ingestion time.
  • Temporal: Temporal, a workflow orchestration platform, is used by companies like Stripe and Coinbase for managing complex stateful applications.

Working Example

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.testing.test_stream import TestStream
import json
from datetime import datetime, timezone

MODE = "stream"
WINDOW_SIZE_SECS = 60
ALLOWED_LATENESS_SECS = 120

def make_event(user_id, event_type, amount, event_time_epoch_s):
  return {"user_id": user_id, "event_type": event_type, "amount": float(amount), "event_time": int(event_time_epoch_s)}

class WindowedUserAgg(beam.PTransform):
  def expand(self, pcoll):
    stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e["event_time"]))
    windowed = stamped | beam.WindowInto(
        FixedWindows(WINDOW_SIZE_SECS),
        allowed_lateness=ALLOWED_LATENESS_SECS,
        trigger=AfterWatermark(
            early=AfterProcessingTime(10),
            late=AfterProcessingTime(10),
        ),
        accumulation_mode=AccumulationMode.ACCUMULATING,
    )
    keyed = windowed | beam.Map(lambda e: (e["user_id"], e["amount"]))
    counts = keyed | beam.combiners.Count.PerKey()
    sums = keyed | beam.CombinePerKey(sum)
    return (
        {"count": counts, "sum_amount": sums}
        | beam.CoGroupByKey()
        | beam.Map(lambda kv: {
            "user_id": kv[0],
            "count": kv[1]["count"][0] if kv[1]["count"] else 0,
            "sum_amount": kv[1]["sum_amount"][0] if kv[1]["sum_amount"] else 0.0,
        })
    )

def run_batch():
  with beam.Pipeline(options=PipelineOptions([])) as p:
    (
        p
        | beam.Create(BATCH_EVENTS)
        | WindowedUserAgg()
        | beam.Map(json.dumps)
        | beam.Map(print)
    )

def run_stream():
  opts = PipelineOptions([])
  opts.view_as(StandardOptions).streaming = True
  with beam.Pipeline(options=opts) as p:
    (
        p
        | build_test_stream()
        | WindowedUserAgg()
        | beam.Map(json.dumps)
        | beam.Map(print)
    )

run_stream() if MODE == "stream" else run_batch()

Practical Applications

  • E-commerce Analytics: Analyzing user purchase behavior in real-time while also reprocessing historical data for long-term trends.
  • Pitfall: Incorrectly configuring windowing or triggers can lead to data loss or inaccurate aggregation, especially with late-arriving data.

References:

Continue reading

Next article

OmniAI: A Single App for 25+ AI Platforms

Related Content