Skip to main content
digital payment systems cryptography banking protocols and blockchain internals

Real-Time Transaction Monitoring Pipelines

10 min read Chapter 18 of 21

Real-Time Transaction Monitoring Pipelines

Every transaction must be scored in under 100 milliseconds. The velocity features from CH6-S1 — transaction counts, amount sums, distinct merchants — must be computed in real-time as events arrive. This is a stream processing problem, and the architecture decisions made here determine whether the system can handle 50,000 transactions per second at p99 latency under 50ms.

Event-Driven Architecture

The monitoring pipeline processes transaction events as they flow through the payment system:

from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal
from enum import Enum
from typing import Callable
import json
import time

class EventType(Enum):
    AUTHORIZATION_REQUEST = "auth_request"
    AUTHORIZATION_RESPONSE = "auth_response"
    CLEARING = "clearing"
    CHARGEBACK = "chargeback"
    FRAUD_ALERT = "fraud_alert"

@dataclass
class TransactionEvent:
    """
    A transaction event flowing through the monitoring pipeline.
    
    Events are produced by the authorization system and consumed
    by the monitoring pipeline. The event carries all data needed
    for fraud scoring — no additional lookups should be needed
    during the hot path.
    """
    event_id: str
    event_type: EventType
    timestamp: datetime
    
    # Transaction data
    transaction_id: str
    card_hash: str
    amount: Decimal
    currency: str
    merchant_id: str
    merchant_category_code: str
    
    # Enrichment data (added by the enrichment stage)
    ip_address: str = ""
    device_fingerprint: str = ""
    ip_country: str = ""
    
    # Scoring result (added by the scoring stage)
    risk_score: float = 0.0
    decision: str = ""
    
    def to_json(self) -> str:
        return json.dumps({
            "event_id": self.event_id,
            "event_type": self.event_type.value,
            "timestamp": self.timestamp.isoformat(),
            "transaction_id": self.transaction_id,
            "card_hash": self.card_hash,
            "amount": str(self.amount),
            "currency": self.currency,
            "merchant_id": self.merchant_id,
            "mcc": self.merchant_category_code,
            "ip_address": self.ip_address,
            "device_fingerprint": self.device_fingerprint,
            "risk_score": self.risk_score,
            "decision": self.decision,
        })

Kafka Topic Design

The partitioning strategy directly impacts processing semantics. For fraud detection, we need all events for a given card to be processed in order:

class KafkaTopicDesign:
    """
    Topic layout for the transaction monitoring pipeline.
    
    Key design decisions:
    
    1. Partition by card_hash: ensures all transactions for a card
       go to the same partition → same consumer → ordered processing.
       This is critical for velocity features (count must be accurate).
    
    2. Separate topics for different event types: authorization events
       need real-time processing (< 100ms), chargebacks can be processed
       with higher latency (minutes).
    
    3. Compacted topics for state: cardholder profiles and fraud labels
       use log-compacted topics where only the latest value per key
       is retained.
    """
    
    TOPICS = {
        # High-throughput, low-latency
        "txn.authorization": {
            "partitions": 256,
            "replication_factor": 3,
            "retention_ms": 7 * 24 * 3600 * 1000,  # 7 days
            "partition_key": "card_hash",
            "throughput": "50,000 events/sec",
        },
        
        # Lower throughput, higher latency tolerance
        "txn.clearing": {
            "partitions": 64,
            "replication_factor": 3,
            "retention_ms": 30 * 24 * 3600 * 1000,  # 30 days
            "partition_key": "transaction_id",
            "throughput": "10,000 events/sec",
        },
        
        # Fraud labels (log-compacted)
        "fraud.labels": {
            "partitions": 32,
            "replication_factor": 3,
            "cleanup_policy": "compact",
            "partition_key": "card_hash",
            "throughput": "100 events/sec",
        },
        
        # Scoring results (for downstream consumers)
        "txn.scored": {
            "partitions": 256,
            "replication_factor": 3,
            "retention_ms": 7 * 24 * 3600 * 1000,
            "partition_key": "card_hash",
            "throughput": "50,000 events/sec",
        },
        
        # Alerts (for case management)
        "fraud.alerts": {
            "partitions": 16,
            "replication_factor": 3,
            "retention_ms": 90 * 24 * 3600 * 1000,  # 90 days
            "partition_key": "alert_id",
            "throughput": "1,000 events/sec",
        },
    }
    
    @staticmethod
    def compute_partition(card_hash: str, num_partitions: int) -> int:
        """
        Consistent hashing for partition assignment.
        
        Uses murmur2 hash (same as Kafka's default partitioner)
        to ensure the same card always maps to the same partition.
        """
        # Simplified murmur2 — production uses the same algorithm as Kafka
        h = 0
        for byte in card_hash.encode():
            h = (h * 31 + byte) & 0xFFFFFFFF
        return h % num_partitions

Stream Processing Pipeline

The pipeline uses a series of operators that transform the event stream:

from collections import defaultdict
from typing import Iterator

class StreamProcessor:
    """
    Real-time transaction monitoring pipeline.
    
    Pipeline stages:
    1. Deserialize: raw bytes → TransactionEvent
    2. Enrich: add device data, IP geolocation, cardholder profile
    3. Compute features: velocity, behavioral, device, graph
    4. Score: run rules + ML model
    5. Decide: approve/challenge/decline/review
    6. Route: send alerts, update counters, emit scored events
    
    Each stage is a stateless or stateful operator. Stateful operators
    (velocity counters, profiles) use local state stores with
    changelog-backed recovery.
    """
    
    def __init__(self):
        self._enrichers: list[Callable] = []
        self._feature_engines: list[Callable] = []
        self._scorer: Callable | None = None
        self._alert_router: 'AlertRouter | None' = None
        self._velocity_store = WindowedCounterStore()
    
    def add_enricher(self, enricher: Callable):
        self._enrichers.append(enricher)
    
    def add_feature_engine(self, engine: Callable):
        self._feature_engines.append(engine)
    
    def set_scorer(self, scorer: Callable):
        self._scorer = scorer
    
    def set_alert_router(self, router: 'AlertRouter'):
        self._alert_router = router
    
    def process_event(self, event: TransactionEvent) -> TransactionEvent:
        """
        Process a single transaction event through the pipeline.
        
        Target latency: < 50ms end-to-end.
        """
        start = time.monotonic()
        
        # Stage 1: Enrich
        for enricher in self._enrichers:
            event = enricher(event)
        
        # Stage 2: Compute features
        features = {}
        for engine in self._feature_engines:
            features.update(engine(event))
        
        # Stage 3: Score
        if self._scorer:
            score_result = self._scorer(event, features)
            event.risk_score = score_result["score"]
            event.decision = score_result["decision"]
        
        # Stage 4: Route alerts
        if self._alert_router and event.risk_score > 500:
            self._alert_router.route_alert(event, features)
        
        # Update velocity counters (after scoring, for next transaction)
        self._velocity_store.increment(
            key=event.card_hash,
            timestamp=event.timestamp,
            amount=event.amount,
            merchant_id=event.merchant_id
        )
        
        elapsed_ms = (time.monotonic() - start) * 1000
        if elapsed_ms > 50:
            # Log slow processing for investigation
            pass
        
        return event


class WindowedCounterStore:
    """
    Sliding window counters for velocity features.
    
    Maintains counts and sums across multiple time windows for
    each card. Uses a bucketed approach for memory efficiency:
    
    Instead of storing every timestamp, divide time into buckets
    (e.g., 1-minute buckets for the 1-hour window). Each bucket
    stores the count and sum. To compute the window aggregate,
    sum the relevant buckets.
    
    Memory per card per window:
    - 1h window with 1-min buckets: 60 buckets × 16 bytes = 960 bytes
    - 24h window with 10-min buckets: 144 buckets × 16 bytes = 2.3 KB
    - 7d window with 1-hour buckets: 168 buckets × 16 bytes = 2.7 KB
    
    Total per card: ~6 KB (vs ~500 bytes for simple counters)
    For 100M cards: ~600 GB (requires distributed state store)
    """
    
    def __init__(self):
        # card_hash -> window_name -> list of (bucket_start, count, amount_sum)
        self._buckets: dict[str, dict[str, list]] = defaultdict(
            lambda: defaultdict(list)
        )
        
        self._window_configs = {
            "1h": {"duration_sec": 3600, "bucket_sec": 60},
            "24h": {"duration_sec": 86400, "bucket_sec": 600},
            "7d": {"duration_sec": 604800, "bucket_sec": 3600},
        }
    
    def increment(
        self, key: str, timestamp: datetime,
        amount: Decimal, merchant_id: str
    ):
        ts = int(timestamp.timestamp())
        
        for window_name, config in self._window_configs.items():
            bucket_start = ts - (ts % config["bucket_sec"])
            buckets = self._buckets[key][window_name]
            
            # Find or create bucket
            found = False
            for i, (bs, count, total, merchants) in enumerate(buckets):
                if bs == bucket_start:
                    buckets[i] = (
                        bs, count + 1, total + float(amount),
                        merchants | {merchant_id}
                    )
                    found = True
                    break
            
            if not found:
                buckets.append(
                    (bucket_start, 1, float(amount), {merchant_id})
                )
            
            # Evict expired buckets
            cutoff = ts - config["duration_sec"]
            self._buckets[key][window_name] = [
                b for b in buckets if b[0] >= cutoff
            ]
    
    def query(self, key: str, window_name: str) -> dict:
        """Query current window aggregate."""
        buckets = self._buckets.get(key, {}).get(window_name, [])
        
        if not buckets:
            return {
                "count": 0,
                "sum": 0.0,
                "distinct_merchants": 0,
            }
        
        total_count = sum(b[1] for b in buckets)
        total_sum = sum(b[2] for b in buckets)
        all_merchants: set = set()
        for b in buckets:
            all_merchants |= b[3]
        
        return {
            "count": total_count,
            "sum": total_sum,
            "distinct_merchants": len(all_merchants),
        }

Alert Routing and Case Management

When a transaction scores above the alert threshold, it needs to reach the right analyst queue:

@dataclass
class FraudAlert:
    alert_id: str
    transaction_id: str
    card_hash: str
    risk_score: float
    decision: str
    reasons: list[str]
    amount: Decimal
    timestamp: datetime
    priority: str        # "critical", "high", "medium", "low"
    assigned_queue: str   # Which analyst queue
    status: str = "open"  # "open", "investigating", "confirmed_fraud", "false_positive"

class AlertRouter:
    """
    Routes fraud alerts to appropriate investigation queues.
    
    Routing logic:
    - Critical (score > 900): immediate SMS to on-call + L3 queue
    - High (700-900): L2 analyst queue (30-min SLA)
    - Medium (500-700): L1 analyst queue (4-hour SLA)
    - Low (< 500): batch review queue (24-hour SLA)
    
    Alert suppression:
    - Suppress duplicate alerts for the same card within 1 hour
    - Aggregate related alerts into a single case
    """
    
    def __init__(self):
        self._recent_alerts: dict[str, datetime] = {}  # card -> last alert time
        self._alert_handlers: dict[str, Callable] = {}
    
    def register_handler(self, priority: str, handler: Callable):
        self._alert_handlers[priority] = handler
    
    def route_alert(
        self, event: TransactionEvent, features: dict
    ) -> FraudAlert | None:
        """Route an alert based on risk score and context."""
        
        # Suppression: don't alert on the same card within 1 hour
        last_alert = self._recent_alerts.get(event.card_hash)
        if last_alert and (event.timestamp - last_alert).total_seconds() < 3600:
            return None
        
        # Determine priority
        if event.risk_score >= 900:
            priority = "critical"
        elif event.risk_score >= 700:
            priority = "high"
        elif event.risk_score >= 500:
            priority = "medium"
        else:
            priority = "low"
        
        # Determine queue based on amount and pattern
        if float(event.amount) > 10000:
            queue = "high_value"
        elif features.get("any_graph_fraud", 0) > 0:
            queue = "organized_fraud"
        else:
            queue = "general"
        
        alert = FraudAlert(
            alert_id=f"ALT-{event.transaction_id}",
            transaction_id=event.transaction_id,
            card_hash=event.card_hash,
            risk_score=event.risk_score,
            decision=event.decision,
            reasons=features.get("top_reasons", []),
            amount=event.amount,
            timestamp=event.timestamp,
            priority=priority,
            assigned_queue=queue,
        )
        
        # Route to handler
        handler = self._alert_handlers.get(priority)
        if handler:
            handler(alert)
        
        self._recent_alerts[event.card_hash] = event.timestamp
        
        return alert

Exactly-Once Processing

Payment monitoring requires exactly-once semantics — a missed transaction means a velocity count is wrong; a double-counted transaction means a false positive:

class ExactlyOnceProcessor:
    """
    Ensures exactly-once processing semantics for the monitoring pipeline.
    
    Three approaches, in order of complexity:
    
    1. Idempotent processing: make operations idempotent so
       reprocessing produces the same result (simplest, used here)
    
    2. Kafka transactions: atomic read-process-write using
       Kafka's transactional producer (production approach)
    
    3. External state with fencing: use epoch-based fencing tokens
       to prevent zombie processors from corrupting state
    
    Kafka transactions provide the strongest guarantee but add
    ~20ms of latency per event. For fraud scoring where p99
    latency is critical, idempotent processing is preferred.
    """
    
    def __init__(self, state_store: 'StateStore'):
        self._state = state_store
        self._processed_offsets: dict[int, int] = {}  # partition -> last offset
    
    def process_with_dedup(
        self, partition: int, offset: int,
        event: TransactionEvent, processor: Callable
    ) -> TransactionEvent | None:
        """
        Process an event with deduplication.
        
        If this partition:offset has already been processed,
        skip it. Otherwise, process and record the offset.
        """
        last_offset = self._processed_offsets.get(partition, -1)
        
        if offset <= last_offset:
            # Already processed — skip (idempotent)
            return None
        
        result = processor(event)
        
        # Record the processed offset AFTER successful processing
        # In production: this is an atomic operation with state update
        self._processed_offsets[partition] = offset
        
        return result
    
    def checkpoint(self):
        """
        Periodically checkpoint processed offsets to durable storage.
        
        On recovery, processing resumes from the last checkpoint.
        Events between the checkpoint and the crash are reprocessed
        (at-least-once), but idempotent operations ensure correctness.
        """
        self._state.save_offsets(self._processed_offsets)

class StateStore:
    """Abstract interface for durable state storage."""
    
    def save_offsets(self, offsets: dict[int, int]):
        """Persist processed offsets for crash recovery."""
        pass
    
    def load_offsets(self) -> dict[int, int]:
        """Load last checkpointed offsets on startup."""
        return {}

Pipeline Performance Characteristics

MetricTargetTypical Production
Event throughput50,000/sec30,000-80,000/sec
p50 latency< 10ms5-8ms
p99 latency< 50ms20-40ms
p99.9 latency< 100ms50-80ms
State store size300-600 GB (distributed)
Recovery time< 30 sec10-20 sec
False positive rate< 1%0.5-2%
Detection rate> 90%85-95%

The p99.9 latency is the binding constraint. A payment authorization timeout is typically 10 seconds, but the fraud scoring window is only 100ms of that. The remaining 9.9 seconds are consumed by network hops, HSM operations, and issuer processing. Every millisecond the fraud pipeline adds is a millisecond less for the rest of the authorization chain.