Skip to main content
aws in the trenches advanced cloud engineering for senior developers

Automated Incident Response and Runbook Automation

6 min read Chapter 18 of 21

Automated Incident Response and Runbook Automation

Manual incident response doesn’t scale. When your alarm fires at 3 AM, the on-call engineer must: wake up, VPN in, identify the issue, remember the fix, execute it correctly while half-asleep. Automating the first-response actions — scaling up, failing over, isolating bad instances — buys time for humans to handle the complex cases.

The Automated Response Pipeline

CloudWatch Alarm → SNS → EventBridge Rule → Lambda (remediation) → SNS (notification)
     ↓                                           ↓
  [Metric breach]                         [Auto-fix + document]
import boto3
import json
import time

# Lambda: Automated response to DynamoDB throttling
def handle_dynamodb_throttling(event, context):
    """
    Triggered by CloudWatch Alarm for DynamoDB ThrottledRequests.
    Auto-remediation: Enable on-demand mode or increase provisioned capacity.
    """
    alarm_name = event['detail']['alarmName']

    # Extract table name from alarm configuration
    # (We encode it in alarm description or dimensions)
    table_name = extract_table_name_from_alarm(alarm_name)

    dynamodb = boto3.client('dynamodb')

    # Get current table status
    table_info = dynamodb.describe_table(TableName=table_name)
    billing_mode = table_info['Table'].get('BillingModeSummary', {}).get('BillingMode', 'PROVISIONED')

    if billing_mode == 'PROVISIONED':
        # Option 1: Increase provisioned capacity by 50%
        current_read = table_info['Table']['ProvisionedThroughput']['ReadCapacityUnits']
        current_write = table_info['Table']['ProvisionedThroughput']['WriteCapacityUnits']

        new_read = int(current_read * 1.5)
        new_write = int(current_write * 1.5)

        dynamodb.update_table(
            TableName=table_name,
            ProvisionedThroughput={
                'ReadCapacityUnits': new_read,
                'WriteCapacityUnits': new_write
            }
        )

        action_taken = f"Increased capacity: RCU {current_read}{new_read}, WCU {current_write}{new_write}"
    else:
        # On-demand mode shouldn't throttle unless hitting account limits
        action_taken = "Table is on-demand — checking account-level limits"

    # Notify the team about the automated action
    notify_team(
        severity='warning',
        title=f'Auto-remediation: DynamoDB throttling on {table_name}',
        details=action_taken,
        alarm_name=alarm_name
    )

    # Create incident timeline entry
    create_timeline_entry(
        incident_type='dynamodb_throttling',
        resource=table_name,
        action=action_taken,
        automated=True
    )

    return {'statusCode': 200, 'action': action_taken}


# Lambda: Auto-remediation for unhealthy ECS tasks
def handle_ecs_health_failure(event, context):
    """
    Triggered when ECS service has unhealthy tasks exceeding threshold.
    Auto-remediation: Force new deployment to replace all tasks.
    """
    cluster = event['detail']['cluster']
    service = event['detail']['service']

    ecs = boto3.client('ecs')

    # Check current deployment status
    svc = ecs.describe_services(cluster=cluster, services=[service])['services'][0]
    running = svc['runningCount']
    desired = svc['desiredCount']

    if running < desired * 0.5:
        # More than 50% unhealthy — force new deployment
        ecs.update_service(
            cluster=cluster,
            service=service,
            forceNewDeployment=True
        )
        action = f"Forced new deployment (running: {running}/{desired})"
    else:
        # Less than 50% unhealthy — scale up temporarily
        ecs.update_service(
            cluster=cluster,
            service=service,
            desiredCount=desired + 2  # Add buffer capacity
        )
        action = f"Scaled up: {desired}{desired + 2}"

    notify_team(severity='high', title=f'ECS auto-remediation: {service}', details=action)
    return {'action': action}
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.*;
import software.amazon.awssdk.services.ecs.EcsClient;
import software.amazon.awssdk.services.ecs.model.*;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import java.util.Map;

public class AutoRemediation implements RequestHandler<Map<String, Object>, Map<String, String>> {

    private final DynamoDbClient dynamo = DynamoDbClient.create();
    private final EcsClient ecs = EcsClient.create();

    @Override
    public Map<String, String> handleRequest(Map<String, Object> event, Context context) {
        String alarmName = getNestedValue(event, "detail", "alarmName");
        String tableName = extractTableName(alarmName);

        DescribeTableResponse tableInfo = dynamo.describeTable(
            DescribeTableRequest.builder().tableName(tableName).build());

        long currentRcu = tableInfo.table().provisionedThroughput().readCapacityUnits();
        long currentWcu = tableInfo.table().provisionedThroughput().writeCapacityUnits();

        long newRcu = (long)(currentRcu * 1.5);
        long newWcu = (long)(currentWcu * 1.5);

        dynamo.updateTable(UpdateTableRequest.builder()
            .tableName(tableName)
            .provisionedThroughput(ProvisionedThroughput.builder()
                .readCapacityUnits(newRcu)
                .writeCapacityUnits(newWcu)
                .build())
            .build());

        String action = String.format("Scaled %s: RCU %d→%d, WCU %d→%d",
            tableName, currentRcu, newRcu, currentWcu, newWcu);

        return Map.of("action", action, "status", "remediated");
    }
}

Systems Manager Automation Runbooks

For more complex remediation that requires multiple steps with approval gates:

# SSM Automation Document (YAML format, defined here as dict for illustration)
restart_service_runbook = {
    'description': 'Restart unhealthy ECS service with safety checks',
    'schemaVersion': '0.3',
    'assumeRole': '{{ AutomationAssumeRole }}',
    'parameters': {
        'ClusterName': {'type': 'String'},
        'ServiceName': {'type': 'String'},
        'AutomationAssumeRole': {'type': 'String'}
    },
    'mainSteps': [
        {
            'name': 'CheckServiceHealth',
            'action': 'aws:executeScript',
            'inputs': {
                'Runtime': 'python3.11',
                'Handler': 'check_health',
                'Script': '''
def check_health(events, context):
    import boto3
    ecs = boto3.client("ecs")
    svc = ecs.describe_services(
        cluster=events["ClusterName"],
        services=[events["ServiceName"]]
    )["services"][0]
    return {
        "running": svc["runningCount"],
        "desired": svc["desiredCount"],
        "healthy_ratio": svc["runningCount"] / max(svc["desiredCount"], 1)
    }
''',
                'InputPayload': {
                    'ClusterName': '{{ ClusterName }}',
                    'ServiceName': '{{ ServiceName }}'
                }
            },
            'outputs': [
                {'Name': 'HealthyRatio', 'Selector': '$.Payload.healthy_ratio', 'Type': 'String'}
            ]
        },
        {
            'name': 'ApprovalGate',
            'action': 'aws:approve',
            'onFailure': 'Abort',
            'inputs': {
                'Approvers': ['arn:aws:iam::123456789012:role/SRE-Team'],
                'Message': 'Service {{ ServiceName }} is unhealthy. Approve restart?',
                'MinRequiredApprovals': 1,
                'NotificationArn': 'arn:aws:sns:us-east-1:123456789012:sre-approvals'
            }
        },
        {
            'name': 'ForceNewDeployment',
            'action': 'aws:executeAwsApi',
            'inputs': {
                'Service': 'ecs',
                'Api': 'UpdateService',
                'cluster': '{{ ClusterName }}',
                'service': '{{ ServiceName }}',
                'forceNewDeployment': True
            }
        },
        {
            'name': 'WaitForStabilization',
            'action': 'aws:waitForAwsResourceProperty',
            'timeoutSeconds': 600,
            'inputs': {
                'Service': 'ecs',
                'Api': 'DescribeServices',
                'cluster': '{{ ClusterName }}',
                'services': ['{{ ServiceName }}'],
                'PropertySelector': '$.services[0].deployments[0].rolloutState',
                'DesiredValues': ['COMPLETED']
            }
        }
    ]
}

Circuit Breaker Pattern

Prevent cascading failures by automatically stopping requests to a failing dependency:

import time
import threading
from enum import Enum
from dataclasses import dataclass, field

class CircuitState(Enum):
    CLOSED = 'closed'      # Normal operation
    OPEN = 'open'          # Failing, reject immediately
    HALF_OPEN = 'half_open'  # Testing if recovered

@dataclass
class CircuitBreaker:
    """
    Circuit breaker for AWS service calls.
    Prevents thundering herd against a failing dependency.
    """
    name: str
    failure_threshold: int = 5
    recovery_timeout: int = 30  # seconds
    success_threshold: int = 3  # successes needed to close from half-open

    _state: CircuitState = field(default=CircuitState.CLOSED, init=False)
    _failure_count: int = field(default=0, init=False)
    _success_count: int = field(default=0, init=False)
    _last_failure_time: float = field(default=0, init=False)
    _lock: threading.Lock = field(default_factory=threading.Lock, init=False)

    @property
    def state(self) -> CircuitState:
        with self._lock:
            if self._state == CircuitState.OPEN:
                if time.time() - self._last_failure_time > self.recovery_timeout:
                    self._state = CircuitState.HALF_OPEN
                    self._success_count = 0
            return self._state

    def call(self, func, *args, **kwargs):
        """Execute function through circuit breaker."""
        current_state = self.state

        if current_state == CircuitState.OPEN:
            raise CircuitOpenError(
                f"Circuit {self.name} is OPEN. Last failure: "
                f"{time.time() - self._last_failure_time:.0f}s ago"
            )

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        with self._lock:
            if self._state == CircuitState.HALF_OPEN:
                self._success_count += 1
                if self._success_count >= self.success_threshold:
                    self._state = CircuitState.CLOSED
                    self._failure_count = 0
            else:
                self._failure_count = 0

    def _on_failure(self):
        with self._lock:
            self._failure_count += 1
            self._last_failure_time = time.time()
            if self._failure_count >= self.failure_threshold:
                self._state = CircuitState.OPEN
            if self._state == CircuitState.HALF_OPEN:
                self._state = CircuitState.OPEN

# Usage
payment_circuit = CircuitBreaker(name='payment-gateway', failure_threshold=3, recovery_timeout=60)

def process_payment(order):
    try:
        return payment_circuit.call(call_payment_api, order)
    except CircuitOpenError:
        # Fallback: queue for later processing
        enqueue_for_retry(order)
        return {'status': 'queued', 'message': 'Payment service temporarily unavailable'}

Incident Timeline Reconstruction

When the dust settles, reconstruct what happened:

def reconstruct_incident_timeline(start_time, end_time, resource_arns):
    """
    Build a complete incident timeline from CloudTrail + CloudWatch.
    """
    cloudtrail = boto3.client('cloudtrail')
    cloudwatch = boto3.client('cloudwatch')

    timeline = []

    # 1. Get all API calls on affected resources
    events = cloudtrail.lookup_events(
        StartTime=start_time,
        EndTime=end_time,
        LookupAttributes=[
            {'AttributeKey': 'ResourceName', 'AttributeValue': arn}
            for arn in resource_arns[:1]  # One at a time
        ]
    )

    for event in events.get('Events', []):
        timeline.append({
            'time': event['EventTime'],
            'type': 'api_call',
            'action': event['EventName'],
            'user': event.get('Username', 'unknown'),
            'source': event.get('EventSource'),
            'details': event.get('CloudTrailEvent', '')
        })

    # 2. Get alarm state changes
    history = cloudwatch.describe_alarm_history(
        AlarmNamePrefix='',  # All alarms
        HistoryItemType='StateUpdate',
        StartDate=start_time,
        EndDate=end_time
    )

    for item in history.get('AlarmHistoryItems', []):
        timeline.append({
            'time': item['Timestamp'],
            'type': 'alarm_state_change',
            'alarm': item['AlarmName'],
            'details': item['HistorySummary']
        })

    # Sort by time
    timeline.sort(key=lambda x: x['time'])

    print("\n📋 INCIDENT TIMELINE")
    print("=" * 80)
    for entry in timeline:
        emoji = {'api_call': '🔧', 'alarm_state_change': '🚨'}.get(entry['type'], '📝')
        print(f"  {entry['time'].strftime('%H:%M:%S')} {emoji} [{entry['type']}] "
              f"{entry.get('action', entry.get('alarm', ''))}")
        if entry.get('user'):
            print(f"{'':>20} by: {entry['user']}")

    return timeline

The automation maturity ladder:

  1. Alert → human investigates → human remediates
  2. Alert → automated diagnosis → human remediates
  3. Alert → automated remediation → human reviews
  4. Anomaly detection → preemptive remediation → no human involvement for known failures