Skip to main content
aws in the trenches advanced cloud engineering for senior developers

Event-Driven Architectures at Scale

6 min read Chapter 7 of 21

Event-Driven Architectures at Scale

Event-driven architecture on AWS looks straightforward in diagrams: boxes connected by arrows through SQS queues and SNS topics. In production, it’s a minefield of invisible guarantees, ordering assumptions, poison pill messages, and partial failure modes that make synchronous request-response architectures seem simple by comparison.

This chapter covers the mechanics you need to build event-driven systems that survive real traffic — not the toy examples from AWS blog posts.

SQS: The Deceptively Complex Queue

SQS Standard queues provide at-least-once delivery with best-effort ordering. FIFO queues provide exactly-once processing with strict ordering within a message group. The choice between them has deep implications:

Event-Driven Architecture Patterns

Standard Queue Mechanics

import boto3
import json
import time
from typing import Optional

sqs = boto3.client('sqs')
QUEUE_URL = 'https://sqs.us-east-1.amazonaws.com/123456789012/order-processing'

def send_with_deduplication(message: dict, dedup_id: Optional[str] = None):
    """
    Standard queues don't support native deduplication.
    You must implement it yourself at the consumer side.
    """
    params = {
        'QueueUrl': QUEUE_URL,
        'MessageBody': json.dumps(message),
        'MessageAttributes': {
            'EventType': {
                'DataType': 'String',
                'StringValue': message.get('event_type', 'unknown')
            },
            'Timestamp': {
                'DataType': 'Number',
                'StringValue': str(int(time.time() * 1000))
            }
        }
    }

    # Delay delivery (0-900 seconds) — useful for retry backoff
    if message.get('retry_count', 0) > 0:
        delay = min(900, 2 ** message['retry_count'] * 10)
        params['DelaySeconds'] = delay

    return sqs.send_message(**params)

def consume_with_visibility_management():
    """
    Critical: Visibility timeout must exceed your processing time.
    If processing takes longer than visibility timeout, the message
    becomes visible again and another consumer picks it up → duplicate processing.
    """
    while True:
        response = sqs.receive_message(
            QueueUrl=QUEUE_URL,
            MaxNumberOfMessages=10,  # Batch for efficiency (1-10)
            WaitTimeSeconds=20,       # Long polling — ALWAYS use this
            VisibilityTimeout=300,    # 5 minutes to process
            MessageAttributeNames=['All']
        )

        messages = response.get('Messages', [])
        if not messages:
            continue

        for message in messages:
            receipt_handle = message['ReceiptHandle']
            try:
                body = json.loads(message['Body'])
                process_order(body)

                # Delete ONLY after successful processing
                sqs.delete_message(
                    QueueUrl=QUEUE_URL,
                    ReceiptHandle=receipt_handle
                )
            except TransientError:
                # Let visibility timeout expire → automatic retry
                # Optionally extend visibility if we need more time:
                sqs.change_message_visibility(
                    QueueUrl=QUEUE_URL,
                    ReceiptHandle=receipt_handle,
                    VisibilityTimeout=600  # Give 10 more minutes
                )
            except PoisonPillError:
                # Move to DLQ explicitly (or let maxReceiveCount handle it)
                sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=receipt_handle)
                send_to_dlq(message)
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.*;
import java.time.Duration;

public class SqsConsumer {

    private final SqsClient sqs = SqsClient.create();
    private final ObjectMapper mapper = new ObjectMapper();
    private static final String QUEUE_URL =
        "https://sqs.us-east-1.amazonaws.com/123456789012/order-processing";

    public void pollAndProcess() {
        while (true) {
            ReceiveMessageResponse response = sqs.receiveMessage(ReceiveMessageRequest.builder()
                .queueUrl(QUEUE_URL)
                .maxNumberOfMessages(10)
                .waitTimeSeconds(20)            // Long polling
                .visibilityTimeout(300)          // 5 min processing window
                .messageAttributeNames("All")
                .build());

            for (Message message : response.messages()) {
                try {
                    Map<String, Object> body = mapper.readValue(
                        message.body(), Map.class);
                    processOrder(body);

                    // Delete only after successful processing
                    sqs.deleteMessage(DeleteMessageRequest.builder()
                        .queueUrl(QUEUE_URL)
                        .receiptHandle(message.receiptHandle())
                        .build());

                } catch (TransientException e) {
                    // Extend visibility for retry
                    sqs.changeMessageVisibility(ChangeMessageVisibilityRequest.builder()
                        .queueUrl(QUEUE_URL)
                        .receiptHandle(message.receiptHandle())
                        .visibilityTimeout(600)
                        .build());
                } catch (Exception e) {
                    // Let maxReceiveCount send it to DLQ
                    System.err.println("Unrecoverable: " + e.getMessage());
                }
            }
        }
    }
}

FIFO Queues: When Order Matters

FIFO queues guarantee that messages within the same MessageGroupId are processed in exactly the order they were sent. But the throughput ceiling is real: 300 messages/sec per queue (3,000 with batching and high-throughput mode).

# FIFO queue: Processing account balance updates in order
FIFO_QUEUE_URL = 'https://sqs.us-east-1.amazonaws.com/123456789012/balance-updates.fifo'

def send_balance_update(account_id: str, amount: float, transaction_id: str):
    """
    MessageGroupId = account_id → all updates for one account are ordered.
    Different accounts can be processed in parallel.
    """
    sqs.send_message(
        QueueUrl=FIFO_QUEUE_URL,
        MessageBody=json.dumps({
            'account_id': account_id,
            'amount': amount,
            'transaction_id': transaction_id
        }),
        MessageGroupId=account_id,  # Ordering key
        MessageDeduplicationId=transaction_id  # Exactly-once within 5-min window
    )

# With 10,000 unique account IDs as MessageGroupIds,
# you get 10,000 independent ordered streams, each processing in parallel.
# Total throughput: up to 300 × number_of_message_groups (with high-throughput mode)

SNS Fan-Out with Subscription Filters

SNS topics + SQS subscriptions create the pub/sub pattern. The power feature is subscription filter policies — each subscriber only receives messages matching its filter:

sns = boto3.client('sns')

# Create subscriptions with filters — each service only gets relevant events
def setup_filtered_subscriptions(topic_arn: str):
    # Payment service: only order.created and order.cancelled
    sns.subscribe(
        TopicArn=topic_arn,
        Protocol='sqs',
        Endpoint='arn:aws:sqs:us-east-1:123456789012:payment-processing',
        Attributes={
            'FilterPolicy': json.dumps({
                'event_type': ['order.created', 'order.cancelled'],
                'amount': [{'numeric': ['>=', 0]}]
            }),
            'FilterPolicyScope': 'MessageAttributes'  # or 'MessageBody' for body-based filtering
        }
    )

    # Notification service: all order events except internal updates
    sns.subscribe(
        TopicArn=topic_arn,
        Protocol='sqs',
        Endpoint='arn:aws:sqs:us-east-1:123456789012:notification-service',
        Attributes={
            'FilterPolicy': json.dumps({
                'event_type': [{'prefix': 'order.'}],
                'internal': [{'exists': False}]  # Exclude internal events
            }),
            'FilterPolicyScope': 'MessageAttributes'
        }
    )

    # Analytics: everything (no filter)
    sns.subscribe(
        TopicArn=topic_arn,
        Protocol='sqs',
        Endpoint='arn:aws:sqs:us-east-1:123456789012:analytics-ingestion'
    )

# Publishing with attributes for filtering
def publish_order_event(topic_arn: str, event: dict):
    sns.publish(
        TopicArn=topic_arn,
        Message=json.dumps(event),
        MessageAttributes={
            'event_type': {
                'DataType': 'String',
                'StringValue': event['event_type']
            },
            'amount': {
                'DataType': 'Number',
                'StringValue': str(event.get('amount', 0))
            }
        }
    )

EventBridge: The Enterprise Event Bus

EventBridge adds content-based routing, schema registry, archive/replay, and pipe transformations on top of the basic pub/sub model. Use it when your event routing logic is complex:

events = boto3.client('events')

# Rule: Route high-value orders to a dedicated processing pipeline
events.put_rule(
    Name='high-value-order-routing',
    EventBusName='orders',
    EventPattern=json.dumps({
        'source': ['com.myapp.orders'],
        'detail-type': ['OrderCreated'],
        'detail': {
            'total': [{'numeric': ['>', 1000]}],
            'customer_tier': ['enterprise', 'premium']
        }
    }),
    State='ENABLED'
)

# Target: Step Functions for complex orchestration
events.put_targets(
    Rule='high-value-order-routing',
    EventBusName='orders',
    Targets=[{
        'Id': 'high-value-processor',
        'Arn': 'arn:aws:states:us-east-1:123456789012:stateMachine:high-value-order-flow',
        'RoleArn': 'arn:aws:iam::123456789012:role/EventBridgeToStepFunctions',
        # Transform the event before passing to Step Functions
        'InputTransformer': {
            'InputPathsMap': {
                'orderId': '$.detail.order_id',
                'customerId': '$.detail.customer_id',
                'total': '$.detail.total'
            },
            'InputTemplate': '{"order_id": <orderId>, "customer_id": <customerId>, "amount": <total>, "priority": "high"}'
        }
    }]
)

# Archive: Store all events for replay (debugging, reprocessing)
events.create_archive(
    ArchiveName='all-order-events',
    EventSourceArn='arn:aws:events:us-east-1:123456789012:event-bus/orders',
    EventPattern=json.dumps({
        'source': ['com.myapp.orders']
    }),
    RetentionDays=90
)

Choreography vs Orchestration

Choreography (events): Each service reacts independently to events. No central coordinator. Services are loosely coupled but the overall workflow is invisible — it only exists in the aggregate behavior of all services.

Orchestration (Step Functions): A central state machine defines the workflow explicitly. Each step calls a service and handles its response. The workflow is visible, debuggable, and modifiable in one place.

AspectChoreographyOrchestration
CouplingServices know nothing about each otherOrchestrator knows all services
VisibilityNo single view of workflow stateFull workflow visible in one place
Failure handlingEach service handles its own errorsCentralized error handling and retries
Adding stepsPublish new event + add new subscriberModify the state machine definition
DebuggingCorrelate logs across services with trace IDStep Functions execution history shows everything
ScalingEach service scales independentlyStep Functions has its own throughput limits

Rule of thumb: Use choreography for independent reactions (notifications, analytics, audit). Use orchestration for sequential workflows with branching logic (order fulfillment, onboarding flows, data pipelines).