Skip to main content
digital payment systems cryptography banking protocols and blockchain internals

Feature Engineering for Payment Fraud Detection

9 min read Chapter 17 of 21

Feature Engineering for Payment Fraud Detection

The model is only as good as its features. Raw transaction data — amount, timestamp, merchant ID — tells you almost nothing about fraud risk. The feature engineering pipeline transforms this raw data into signals that distinguish fraudulent behavior from legitimate patterns.

In payment fraud, the most predictive features fall into five categories: velocity, behavioral deviation, device intelligence, geographic analysis, and graph relationships.

Velocity Features

Velocity features measure transaction frequency and volume across time windows. The intuition: a stolen card is used as fast as possible before the cardholder notices.

from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Generator
import hashlib

class VelocityFeatureStore:
    """
    Computes velocity features across multiple time windows.
    
    Architecture: maintains sliding window counters for each
    card/account, updated in real-time as transactions arrive.
    
    In production, this is backed by Redis with sorted sets:
    - Key: "velocity:{card_hash}:{window}"
    - Members: transaction timestamps
    - Score: timestamp (for range queries)
    
    Memory-efficient: each card needs ~500 bytes per window.
    For 100M active cards × 6 windows = ~300 GB Redis.
    """
    
    def __init__(self):
        # card_hash -> list of (timestamp, amount, merchant_id, country)
        self._history: dict[str, list[tuple]] = defaultdict(list)
    
    def record_transaction(
        self, card_hash: str, timestamp: datetime,
        amount: Decimal, merchant_id: str, country: str
    ):
        self._history[card_hash].append(
            (timestamp, amount, merchant_id, country)
        )
    
    def compute_features(
        self, card_hash: str, current_time: datetime
    ) -> dict[str, float]:
        """
        Compute all velocity features for a card at a given time.
        
        Features span multiple time windows (1h, 6h, 24h, 7d, 30d)
        and multiple aggregation levels (count, sum, distinct).
        """
        history = self._history.get(card_hash, [])
        
        windows = {
            "1h": timedelta(hours=1),
            "6h": timedelta(hours=6),
            "24h": timedelta(hours=24),
            "7d": timedelta(days=7),
            "30d": timedelta(days=30),
        }
        
        features = {}
        
        for window_name, window_delta in windows.items():
            cutoff = current_time - window_delta
            window_txns = [
                (ts, amt, mid, country) 
                for ts, amt, mid, country in history
                if ts >= cutoff
            ]
            
            # Transaction count
            features[f"txn_count_{window_name}"] = len(window_txns)
            
            # Total amount
            features[f"txn_amount_sum_{window_name}"] = float(
                sum(amt for _, amt, _, _ in window_txns)
            )
            
            # Average amount
            features[f"txn_amount_avg_{window_name}"] = (
                features[f"txn_amount_sum_{window_name}"] / len(window_txns)
                if window_txns else 0.0
            )
            
            # Max amount
            features[f"txn_amount_max_{window_name}"] = float(
                max((amt for _, amt, _, _ in window_txns), default=0)
            )
            
            # Distinct merchants
            features[f"distinct_merchants_{window_name}"] = len(
                set(mid for _, _, mid, _ in window_txns)
            )
            
            # Distinct countries
            features[f"distinct_countries_{window_name}"] = len(
                set(c for _, _, _, c in window_txns)
            )
            
            # Time since last transaction (seconds)
            if window_txns:
                latest = max(ts for ts, _, _, _ in window_txns)
                features[f"seconds_since_last_txn_{window_name}"] = (
                    current_time - latest
                ).total_seconds()
            else:
                features[f"seconds_since_last_txn_{window_name}"] = -1
        
        # Inter-transaction time features
        if len(history) >= 2:
            sorted_times = sorted(ts for ts, _, _, _ in history)
            deltas = [
                (sorted_times[i] - sorted_times[i-1]).total_seconds()
                for i in range(1, len(sorted_times))
            ]
            features["avg_inter_txn_seconds"] = sum(deltas) / len(deltas)
            features["min_inter_txn_seconds"] = min(deltas)
        else:
            features["avg_inter_txn_seconds"] = -1
            features["min_inter_txn_seconds"] = -1
        
        return features

Behavioral Deviation Features

Behavioral features compare the current transaction against the cardholder’s established patterns. The key principle: what’s normal for one person may be anomalous for another.

import math

@dataclass
class CardholderProfile:
    """
    Statistical profile of a cardholder's normal behavior.
    
    Built from the last 90 days of transaction history.
    Updated incrementally with each new transaction.
    
    In production, profiles are computed in batch (nightly) and
    stored in a profile store (DynamoDB/Cassandra) for real-time
    lookup during scoring.
    """
    card_hash: str
    
    # Amount statistics
    avg_amount: float = 0.0
    std_amount: float = 0.0
    median_amount: float = 0.0
    p95_amount: float = 0.0
    max_amount: float = 0.0
    
    # Temporal patterns
    typical_hours: set[int] = field(default_factory=set)     # Hours when cardholder transacts
    typical_days: set[int] = field(default_factory=set)       # Days of week (0=Mon)
    
    # Geographic patterns
    home_country: str = ""
    typical_countries: set[str] = field(default_factory=set)
    
    # Merchant patterns
    typical_mccs: set[str] = field(default_factory=set)       # Merchant Category Codes
    typical_merchants: set[str] = field(default_factory=set)   # Specific merchants
    
    # Channel patterns
    typical_channels: set[str] = field(default_factory=set)    # ecom, pos, atm
    
    # Activity level
    avg_daily_txn_count: float = 0.0
    avg_weekly_amount: float = 0.0
    total_transaction_count: int = 0

class BehavioralDeviationEngine:
    """
    Computes how much a transaction deviates from the cardholder's
    established behavioral profile.
    """
    
    def compute_features(
        self, txn_amount: float, txn_hour: int, txn_day: int,
        txn_country: str, txn_mcc: str, txn_channel: str,
        merchant_id: str, profile: CardholderProfile
    ) -> dict[str, float]:
        features = {}
        
        # Amount deviation (z-score)
        if profile.std_amount > 0:
            features["amount_zscore"] = (
                (txn_amount - profile.avg_amount) / profile.std_amount
            )
        else:
            features["amount_zscore"] = 0.0
        
        # Amount ratio to profile statistics
        features["amount_to_avg_ratio"] = (
            txn_amount / profile.avg_amount if profile.avg_amount > 0 else 0
        )
        features["amount_to_p95_ratio"] = (
            txn_amount / profile.p95_amount if profile.p95_amount > 0 else 0
        )
        features["exceeds_max"] = float(txn_amount > profile.max_amount)
        
        # Temporal deviation
        features["unusual_hour"] = float(
            txn_hour not in profile.typical_hours
        ) if profile.typical_hours else 0.0
        features["unusual_day"] = float(
            txn_day not in profile.typical_days
        ) if profile.typical_days else 0.0
        
        # Geographic deviation
        features["new_country"] = float(
            txn_country not in profile.typical_countries
        ) if profile.typical_countries else 0.0
        features["is_home_country"] = float(
            txn_country == profile.home_country
        )
        
        # Merchant deviation
        features["new_mcc"] = float(
            txn_mcc not in profile.typical_mccs
        ) if profile.typical_mccs else 0.0
        features["new_merchant"] = float(
            merchant_id not in profile.typical_merchants
        ) if profile.typical_merchants else 0.0
        
        # Channel deviation
        features["new_channel"] = float(
            txn_channel not in profile.typical_channels
        ) if profile.typical_channels else 0.0
        
        # Composite novelty score: how many things are "new"?
        novelty_flags = [
            features["unusual_hour"],
            features["unusual_day"],
            features["new_country"],
            features["new_mcc"],
            features["new_merchant"],
            features["new_channel"],
        ]
        features["novelty_count"] = sum(novelty_flags)
        features["novelty_score"] = sum(novelty_flags) / len(novelty_flags)
        
        return features

Device Fingerprinting

Device fingerprinting creates a probabilistic identifier for the device or browser being used. Legitimate cardholders typically use 1-3 devices; fraudsters use many (often with spoofing tools):

@dataclass
class DeviceFingerprint:
    """
    Device fingerprint computed from browser/device attributes.
    
    Entropy sources (browser):
    - User-Agent string (~10 bits)
    - Screen resolution + color depth (~8 bits)
    - Timezone (~4 bits)
    - Installed fonts (~15 bits via canvas fingerprinting)
    - WebGL renderer (~12 bits)
    - AudioContext fingerprint (~8 bits)
    - Canvas fingerprint (~18 bits)
    - Battery API, Hardware concurrency, Device memory
    
    Total entropy: ~75 bits — enough to uniquely identify most
    browser instances. Combined entropy across multiple signals
    produces a collision-resistant fingerprint.
    """
    user_agent: str
    screen_width: int
    screen_height: int
    color_depth: int
    timezone_offset: int     # Minutes from UTC
    language: str
    platform: str
    hardware_concurrency: int
    device_memory: float     # GB
    webgl_renderer: str
    canvas_hash: str         # Hash of canvas rendering test
    audio_hash: str          # Hash of AudioContext output
    fonts_hash: str          # Hash of detected system fonts
    
    def compute_fingerprint_hash(self) -> str:
        """
        Compute a stable hash from all fingerprint components.
        
        Uses SHA-256 for collision resistance. The hash is stored
        in the profile store and compared against new transactions.
        """
        components = (
            f"{self.user_agent}|"
            f"{self.screen_width}x{self.screen_height}x{self.color_depth}|"
            f"{self.timezone_offset}|"
            f"{self.language}|{self.platform}|"
            f"{self.hardware_concurrency}|{self.device_memory}|"
            f"{self.webgl_renderer}|"
            f"{self.canvas_hash}|{self.audio_hash}|{self.fonts_hash}"
        )
        return hashlib.sha256(components.encode()).hexdigest()
    
    def compute_entropy_score(self) -> float:
        """
        Estimate the uniqueness of this fingerprint.
        
        Low entropy → common device configuration → less useful for ID
        High entropy → unique configuration → strong identifier
        
        Fraud indicator: very LOW entropy suggests fingerprint spoofing
        (fraudsters using tools like Multilogin or GoLogin that
        generate generic fingerprints).
        """
        entropy = 0.0
        
        # User-Agent entropy (longer = more specific = higher entropy)
        entropy += min(len(self.user_agent) / 20.0, 5.0)
        
        # Screen resolution entropy
        common_resolutions = {
            (1920, 1080), (1366, 768), (1536, 864),
            (1440, 900), (2560, 1440), (3840, 2160)
        }
        if (self.screen_width, self.screen_height) not in common_resolutions:
            entropy += 3.0
        else:
            entropy += 1.0
        
        # Canvas + Audio entropy (these are the most unique signals)
        if self.canvas_hash:
            entropy += 4.0
        if self.audio_hash:
            entropy += 3.0
        
        # Font detection entropy
        if self.fonts_hash:
            entropy += 4.0
        
        return entropy

class DeviceFeatureEngine:
    """
    Compute fraud-relevant features from device fingerprints.
    """
    
    def __init__(self):
        # card_hash -> set of known fingerprint hashes
        self._known_devices: dict[str, set[str]] = defaultdict(set)
    
    def compute_features(
        self, card_hash: str, fingerprint: DeviceFingerprint
    ) -> dict[str, float]:
        fp_hash = fingerprint.compute_fingerprint_hash()
        known = self._known_devices.get(card_hash, set())
        
        features = {}
        
        # Is this a known device for this card?
        features["is_known_device"] = float(fp_hash in known)
        
        # Number of known devices
        features["num_known_devices"] = len(known)
        
        # Entropy score (low entropy = possible spoofing)
        features["device_entropy"] = fingerprint.compute_entropy_score()
        features["possible_spoofing"] = float(
            fingerprint.compute_entropy_score() < 3.0
        )
        
        # Update known devices
        self._known_devices[card_hash].add(fp_hash)
        
        return features

Graph-Based Features

Organized fraud rings share resources — devices, IP addresses, shipping addresses, phone numbers. Graph features detect these connections:

class FraudGraphFeatureEngine:
    """
    Compute features from a transaction graph.
    
    The graph connects entities through shared attributes:
    - Card → Device (used from this device)
    - Card → IP (used from this IP)
    - Card → Address (shipped to this address)
    - Card → Merchant (purchased at this merchant)
    - Card → Phone (registered with this phone)
    
    A fraud ring shows up as a dense cluster: multiple cards
    sharing the same device/IP/address, with high fraud rates
    on connected nodes.
    """
    
    def __init__(self):
        # Bipartite edges: entity_type:entity_value -> set of card_hashes
        self._graph: dict[str, set[str]] = defaultdict(set)
        # Fraud labels
        self._fraud_cards: set[str] = set()
    
    def add_edge(self, card_hash: str, entity_type: str, entity_value: str):
        key = f"{entity_type}:{entity_value}"
        self._graph[key].add(card_hash)
    
    def mark_fraud(self, card_hash: str):
        self._fraud_cards.add(card_hash)
    
    def compute_features(
        self, card_hash: str, device_fp: str, ip_address: str,
        shipping_address_hash: str
    ) -> dict[str, float]:
        features = {}
        
        # For each shared attribute, count connected cards and fraud rate
        for entity_type, entity_value in [
            ("device", device_fp),
            ("ip", ip_address),
            ("address", shipping_address_hash),
        ]:
            if not entity_value:
                features[f"graph_{entity_type}_shared_cards"] = 0
                features[f"graph_{entity_type}_fraud_rate"] = 0.0
                continue
            
            key = f"{entity_type}:{entity_value}"
            connected_cards = self._graph.get(key, set())
            
            # Number of other cards sharing this attribute
            other_cards = connected_cards - {card_hash}
            features[f"graph_{entity_type}_shared_cards"] = len(other_cards)
            
            # Fraud rate among connected cards
            if other_cards:
                fraud_count = len(other_cards & self._fraud_cards)
                features[f"graph_{entity_type}_fraud_rate"] = (
                    fraud_count / len(other_cards)
                )
            else:
                features[f"graph_{entity_type}_fraud_rate"] = 0.0
        
        # Aggregate graph risk
        fraud_rates = [
            features[f"graph_{et}_fraud_rate"]
            for et in ["device", "ip", "address"]
        ]
        features["max_graph_fraud_rate"] = max(fraud_rates)
        features["any_graph_fraud"] = float(any(r > 0 for r in fraud_rates))
        
        # Degree centrality (how connected is this card?)
        total_connections = sum(
            features[f"graph_{et}_shared_cards"]
            for et in ["device", "ip", "address"]
        )
        features["graph_degree_centrality"] = total_connections
        
        return features

Feature Pipeline Summary

Category# FeaturesLatencyData Source
Velocity~35< 2msRedis sorted sets
Behavioral deviation~15< 1msProfile store (Cassandra)
Device fingerprint~5< 1msIn-memory + profile store
Graph features~10< 5msGraph database (Neo4j/Neptune)
Transaction raw~20< 1msTransaction message
Total~85< 10ms

The most predictive individual features (by SHAP importance in production models):

  1. amount_zscore — how unusual is this amount for this cardholder
  2. is_known_device — device recognition
  3. max_graph_fraud_rate — fraud proximity in the entity graph
  4. txn_count_24h — velocity
  5. novelty_score — how many behavioral dimensions are simultaneously unusual