Real-Time Transaction Monitoring Pipelines
Real-Time Transaction Monitoring Pipelines
Every transaction must be scored in under 100 milliseconds. The velocity features from CH6-S1 — transaction counts, amount sums, distinct merchants — must be computed in real-time as events arrive. This is a stream processing problem, and the architecture decisions made here determine whether the system can handle 50,000 transactions per second at p99 latency under 50ms.
Event-Driven Architecture
The monitoring pipeline processes transaction events as they flow through the payment system:
from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal
from enum import Enum
from typing import Callable
import json
import time
class EventType(Enum):
AUTHORIZATION_REQUEST = "auth_request"
AUTHORIZATION_RESPONSE = "auth_response"
CLEARING = "clearing"
CHARGEBACK = "chargeback"
FRAUD_ALERT = "fraud_alert"
@dataclass
class TransactionEvent:
"""
A transaction event flowing through the monitoring pipeline.
Events are produced by the authorization system and consumed
by the monitoring pipeline. The event carries all data needed
for fraud scoring — no additional lookups should be needed
during the hot path.
"""
event_id: str
event_type: EventType
timestamp: datetime
# Transaction data
transaction_id: str
card_hash: str
amount: Decimal
currency: str
merchant_id: str
merchant_category_code: str
# Enrichment data (added by the enrichment stage)
ip_address: str = ""
device_fingerprint: str = ""
ip_country: str = ""
# Scoring result (added by the scoring stage)
risk_score: float = 0.0
decision: str = ""
def to_json(self) -> str:
return json.dumps({
"event_id": self.event_id,
"event_type": self.event_type.value,
"timestamp": self.timestamp.isoformat(),
"transaction_id": self.transaction_id,
"card_hash": self.card_hash,
"amount": str(self.amount),
"currency": self.currency,
"merchant_id": self.merchant_id,
"mcc": self.merchant_category_code,
"ip_address": self.ip_address,
"device_fingerprint": self.device_fingerprint,
"risk_score": self.risk_score,
"decision": self.decision,
})
Kafka Topic Design
The partitioning strategy directly impacts processing semantics. For fraud detection, we need all events for a given card to be processed in order:
class KafkaTopicDesign:
"""
Topic layout for the transaction monitoring pipeline.
Key design decisions:
1. Partition by card_hash: ensures all transactions for a card
go to the same partition → same consumer → ordered processing.
This is critical for velocity features (count must be accurate).
2. Separate topics for different event types: authorization events
need real-time processing (< 100ms), chargebacks can be processed
with higher latency (minutes).
3. Compacted topics for state: cardholder profiles and fraud labels
use log-compacted topics where only the latest value per key
is retained.
"""
TOPICS = {
# High-throughput, low-latency
"txn.authorization": {
"partitions": 256,
"replication_factor": 3,
"retention_ms": 7 * 24 * 3600 * 1000, # 7 days
"partition_key": "card_hash",
"throughput": "50,000 events/sec",
},
# Lower throughput, higher latency tolerance
"txn.clearing": {
"partitions": 64,
"replication_factor": 3,
"retention_ms": 30 * 24 * 3600 * 1000, # 30 days
"partition_key": "transaction_id",
"throughput": "10,000 events/sec",
},
# Fraud labels (log-compacted)
"fraud.labels": {
"partitions": 32,
"replication_factor": 3,
"cleanup_policy": "compact",
"partition_key": "card_hash",
"throughput": "100 events/sec",
},
# Scoring results (for downstream consumers)
"txn.scored": {
"partitions": 256,
"replication_factor": 3,
"retention_ms": 7 * 24 * 3600 * 1000,
"partition_key": "card_hash",
"throughput": "50,000 events/sec",
},
# Alerts (for case management)
"fraud.alerts": {
"partitions": 16,
"replication_factor": 3,
"retention_ms": 90 * 24 * 3600 * 1000, # 90 days
"partition_key": "alert_id",
"throughput": "1,000 events/sec",
},
}
@staticmethod
def compute_partition(card_hash: str, num_partitions: int) -> int:
"""
Consistent hashing for partition assignment.
Uses murmur2 hash (same as Kafka's default partitioner)
to ensure the same card always maps to the same partition.
"""
# Simplified murmur2 — production uses the same algorithm as Kafka
h = 0
for byte in card_hash.encode():
h = (h * 31 + byte) & 0xFFFFFFFF
return h % num_partitions
Stream Processing Pipeline
The pipeline uses a series of operators that transform the event stream:
from collections import defaultdict
from typing import Iterator
class StreamProcessor:
"""
Real-time transaction monitoring pipeline.
Pipeline stages:
1. Deserialize: raw bytes → TransactionEvent
2. Enrich: add device data, IP geolocation, cardholder profile
3. Compute features: velocity, behavioral, device, graph
4. Score: run rules + ML model
5. Decide: approve/challenge/decline/review
6. Route: send alerts, update counters, emit scored events
Each stage is a stateless or stateful operator. Stateful operators
(velocity counters, profiles) use local state stores with
changelog-backed recovery.
"""
def __init__(self):
self._enrichers: list[Callable] = []
self._feature_engines: list[Callable] = []
self._scorer: Callable | None = None
self._alert_router: 'AlertRouter | None' = None
self._velocity_store = WindowedCounterStore()
def add_enricher(self, enricher: Callable):
self._enrichers.append(enricher)
def add_feature_engine(self, engine: Callable):
self._feature_engines.append(engine)
def set_scorer(self, scorer: Callable):
self._scorer = scorer
def set_alert_router(self, router: 'AlertRouter'):
self._alert_router = router
def process_event(self, event: TransactionEvent) -> TransactionEvent:
"""
Process a single transaction event through the pipeline.
Target latency: < 50ms end-to-end.
"""
start = time.monotonic()
# Stage 1: Enrich
for enricher in self._enrichers:
event = enricher(event)
# Stage 2: Compute features
features = {}
for engine in self._feature_engines:
features.update(engine(event))
# Stage 3: Score
if self._scorer:
score_result = self._scorer(event, features)
event.risk_score = score_result["score"]
event.decision = score_result["decision"]
# Stage 4: Route alerts
if self._alert_router and event.risk_score > 500:
self._alert_router.route_alert(event, features)
# Update velocity counters (after scoring, for next transaction)
self._velocity_store.increment(
key=event.card_hash,
timestamp=event.timestamp,
amount=event.amount,
merchant_id=event.merchant_id
)
elapsed_ms = (time.monotonic() - start) * 1000
if elapsed_ms > 50:
# Log slow processing for investigation
pass
return event
class WindowedCounterStore:
"""
Sliding window counters for velocity features.
Maintains counts and sums across multiple time windows for
each card. Uses a bucketed approach for memory efficiency:
Instead of storing every timestamp, divide time into buckets
(e.g., 1-minute buckets for the 1-hour window). Each bucket
stores the count and sum. To compute the window aggregate,
sum the relevant buckets.
Memory per card per window:
- 1h window with 1-min buckets: 60 buckets × 16 bytes = 960 bytes
- 24h window with 10-min buckets: 144 buckets × 16 bytes = 2.3 KB
- 7d window with 1-hour buckets: 168 buckets × 16 bytes = 2.7 KB
Total per card: ~6 KB (vs ~500 bytes for simple counters)
For 100M cards: ~600 GB (requires distributed state store)
"""
def __init__(self):
# card_hash -> window_name -> list of (bucket_start, count, amount_sum)
self._buckets: dict[str, dict[str, list]] = defaultdict(
lambda: defaultdict(list)
)
self._window_configs = {
"1h": {"duration_sec": 3600, "bucket_sec": 60},
"24h": {"duration_sec": 86400, "bucket_sec": 600},
"7d": {"duration_sec": 604800, "bucket_sec": 3600},
}
def increment(
self, key: str, timestamp: datetime,
amount: Decimal, merchant_id: str
):
ts = int(timestamp.timestamp())
for window_name, config in self._window_configs.items():
bucket_start = ts - (ts % config["bucket_sec"])
buckets = self._buckets[key][window_name]
# Find or create bucket
found = False
for i, (bs, count, total, merchants) in enumerate(buckets):
if bs == bucket_start:
buckets[i] = (
bs, count + 1, total + float(amount),
merchants | {merchant_id}
)
found = True
break
if not found:
buckets.append(
(bucket_start, 1, float(amount), {merchant_id})
)
# Evict expired buckets
cutoff = ts - config["duration_sec"]
self._buckets[key][window_name] = [
b for b in buckets if b[0] >= cutoff
]
def query(self, key: str, window_name: str) -> dict:
"""Query current window aggregate."""
buckets = self._buckets.get(key, {}).get(window_name, [])
if not buckets:
return {
"count": 0,
"sum": 0.0,
"distinct_merchants": 0,
}
total_count = sum(b[1] for b in buckets)
total_sum = sum(b[2] for b in buckets)
all_merchants: set = set()
for b in buckets:
all_merchants |= b[3]
return {
"count": total_count,
"sum": total_sum,
"distinct_merchants": len(all_merchants),
}
Alert Routing and Case Management
When a transaction scores above the alert threshold, it needs to reach the right analyst queue:
@dataclass
class FraudAlert:
alert_id: str
transaction_id: str
card_hash: str
risk_score: float
decision: str
reasons: list[str]
amount: Decimal
timestamp: datetime
priority: str # "critical", "high", "medium", "low"
assigned_queue: str # Which analyst queue
status: str = "open" # "open", "investigating", "confirmed_fraud", "false_positive"
class AlertRouter:
"""
Routes fraud alerts to appropriate investigation queues.
Routing logic:
- Critical (score > 900): immediate SMS to on-call + L3 queue
- High (700-900): L2 analyst queue (30-min SLA)
- Medium (500-700): L1 analyst queue (4-hour SLA)
- Low (< 500): batch review queue (24-hour SLA)
Alert suppression:
- Suppress duplicate alerts for the same card within 1 hour
- Aggregate related alerts into a single case
"""
def __init__(self):
self._recent_alerts: dict[str, datetime] = {} # card -> last alert time
self._alert_handlers: dict[str, Callable] = {}
def register_handler(self, priority: str, handler: Callable):
self._alert_handlers[priority] = handler
def route_alert(
self, event: TransactionEvent, features: dict
) -> FraudAlert | None:
"""Route an alert based on risk score and context."""
# Suppression: don't alert on the same card within 1 hour
last_alert = self._recent_alerts.get(event.card_hash)
if last_alert and (event.timestamp - last_alert).total_seconds() < 3600:
return None
# Determine priority
if event.risk_score >= 900:
priority = "critical"
elif event.risk_score >= 700:
priority = "high"
elif event.risk_score >= 500:
priority = "medium"
else:
priority = "low"
# Determine queue based on amount and pattern
if float(event.amount) > 10000:
queue = "high_value"
elif features.get("any_graph_fraud", 0) > 0:
queue = "organized_fraud"
else:
queue = "general"
alert = FraudAlert(
alert_id=f"ALT-{event.transaction_id}",
transaction_id=event.transaction_id,
card_hash=event.card_hash,
risk_score=event.risk_score,
decision=event.decision,
reasons=features.get("top_reasons", []),
amount=event.amount,
timestamp=event.timestamp,
priority=priority,
assigned_queue=queue,
)
# Route to handler
handler = self._alert_handlers.get(priority)
if handler:
handler(alert)
self._recent_alerts[event.card_hash] = event.timestamp
return alert
Exactly-Once Processing
Payment monitoring requires exactly-once semantics — a missed transaction means a velocity count is wrong; a double-counted transaction means a false positive:
class ExactlyOnceProcessor:
"""
Ensures exactly-once processing semantics for the monitoring pipeline.
Three approaches, in order of complexity:
1. Idempotent processing: make operations idempotent so
reprocessing produces the same result (simplest, used here)
2. Kafka transactions: atomic read-process-write using
Kafka's transactional producer (production approach)
3. External state with fencing: use epoch-based fencing tokens
to prevent zombie processors from corrupting state
Kafka transactions provide the strongest guarantee but add
~20ms of latency per event. For fraud scoring where p99
latency is critical, idempotent processing is preferred.
"""
def __init__(self, state_store: 'StateStore'):
self._state = state_store
self._processed_offsets: dict[int, int] = {} # partition -> last offset
def process_with_dedup(
self, partition: int, offset: int,
event: TransactionEvent, processor: Callable
) -> TransactionEvent | None:
"""
Process an event with deduplication.
If this partition:offset has already been processed,
skip it. Otherwise, process and record the offset.
"""
last_offset = self._processed_offsets.get(partition, -1)
if offset <= last_offset:
# Already processed — skip (idempotent)
return None
result = processor(event)
# Record the processed offset AFTER successful processing
# In production: this is an atomic operation with state update
self._processed_offsets[partition] = offset
return result
def checkpoint(self):
"""
Periodically checkpoint processed offsets to durable storage.
On recovery, processing resumes from the last checkpoint.
Events between the checkpoint and the crash are reprocessed
(at-least-once), but idempotent operations ensure correctness.
"""
self._state.save_offsets(self._processed_offsets)
class StateStore:
"""Abstract interface for durable state storage."""
def save_offsets(self, offsets: dict[int, int]):
"""Persist processed offsets for crash recovery."""
pass
def load_offsets(self) -> dict[int, int]:
"""Load last checkpointed offsets on startup."""
return {}
Pipeline Performance Characteristics
| Metric | Target | Typical Production |
|---|---|---|
| Event throughput | 50,000/sec | 30,000-80,000/sec |
| p50 latency | < 10ms | 5-8ms |
| p99 latency | < 50ms | 20-40ms |
| p99.9 latency | < 100ms | 50-80ms |
| State store size | — | 300-600 GB (distributed) |
| Recovery time | < 30 sec | 10-20 sec |
| False positive rate | < 1% | 0.5-2% |
| Detection rate | > 90% | 85-95% |
The p99.9 latency is the binding constraint. A payment authorization timeout is typically 10 seconds, but the fraud scoring window is only 100ms of that. The remaining 9.9 seconds are consumed by network hops, HSM operations, and issuer processing. Every millisecond the fraud pipeline adds is a millisecond less for the rest of the authorization chain.