aws in the trenches advanced cloud engineering for senior developers
Automated Incident Response and Runbook Automation
6 min read
Chapter 18 of 21
Automated Incident Response and Runbook Automation
Manual incident response doesn’t scale. When your alarm fires at 3 AM, the on-call engineer must: wake up, VPN in, identify the issue, remember the fix, execute it correctly while half-asleep. Automating the first-response actions — scaling up, failing over, isolating bad instances — buys time for humans to handle the complex cases.
The Automated Response Pipeline
CloudWatch Alarm → SNS → EventBridge Rule → Lambda (remediation) → SNS (notification)
↓ ↓
[Metric breach] [Auto-fix + document]
import boto3
import json
import time
# Lambda: Automated response to DynamoDB throttling
def handle_dynamodb_throttling(event, context):
"""
Triggered by CloudWatch Alarm for DynamoDB ThrottledRequests.
Auto-remediation: Enable on-demand mode or increase provisioned capacity.
"""
alarm_name = event['detail']['alarmName']
# Extract table name from alarm configuration
# (We encode it in alarm description or dimensions)
table_name = extract_table_name_from_alarm(alarm_name)
dynamodb = boto3.client('dynamodb')
# Get current table status
table_info = dynamodb.describe_table(TableName=table_name)
billing_mode = table_info['Table'].get('BillingModeSummary', {}).get('BillingMode', 'PROVISIONED')
if billing_mode == 'PROVISIONED':
# Option 1: Increase provisioned capacity by 50%
current_read = table_info['Table']['ProvisionedThroughput']['ReadCapacityUnits']
current_write = table_info['Table']['ProvisionedThroughput']['WriteCapacityUnits']
new_read = int(current_read * 1.5)
new_write = int(current_write * 1.5)
dynamodb.update_table(
TableName=table_name,
ProvisionedThroughput={
'ReadCapacityUnits': new_read,
'WriteCapacityUnits': new_write
}
)
action_taken = f"Increased capacity: RCU {current_read}→{new_read}, WCU {current_write}→{new_write}"
else:
# On-demand mode shouldn't throttle unless hitting account limits
action_taken = "Table is on-demand — checking account-level limits"
# Notify the team about the automated action
notify_team(
severity='warning',
title=f'Auto-remediation: DynamoDB throttling on {table_name}',
details=action_taken,
alarm_name=alarm_name
)
# Create incident timeline entry
create_timeline_entry(
incident_type='dynamodb_throttling',
resource=table_name,
action=action_taken,
automated=True
)
return {'statusCode': 200, 'action': action_taken}
# Lambda: Auto-remediation for unhealthy ECS tasks
def handle_ecs_health_failure(event, context):
"""
Triggered when ECS service has unhealthy tasks exceeding threshold.
Auto-remediation: Force new deployment to replace all tasks.
"""
cluster = event['detail']['cluster']
service = event['detail']['service']
ecs = boto3.client('ecs')
# Check current deployment status
svc = ecs.describe_services(cluster=cluster, services=[service])['services'][0]
running = svc['runningCount']
desired = svc['desiredCount']
if running < desired * 0.5:
# More than 50% unhealthy — force new deployment
ecs.update_service(
cluster=cluster,
service=service,
forceNewDeployment=True
)
action = f"Forced new deployment (running: {running}/{desired})"
else:
# Less than 50% unhealthy — scale up temporarily
ecs.update_service(
cluster=cluster,
service=service,
desiredCount=desired + 2 # Add buffer capacity
)
action = f"Scaled up: {desired} → {desired + 2}"
notify_team(severity='high', title=f'ECS auto-remediation: {service}', details=action)
return {'action': action}
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.*;
import software.amazon.awssdk.services.ecs.EcsClient;
import software.amazon.awssdk.services.ecs.model.*;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import java.util.Map;
public class AutoRemediation implements RequestHandler<Map<String, Object>, Map<String, String>> {
private final DynamoDbClient dynamo = DynamoDbClient.create();
private final EcsClient ecs = EcsClient.create();
@Override
public Map<String, String> handleRequest(Map<String, Object> event, Context context) {
String alarmName = getNestedValue(event, "detail", "alarmName");
String tableName = extractTableName(alarmName);
DescribeTableResponse tableInfo = dynamo.describeTable(
DescribeTableRequest.builder().tableName(tableName).build());
long currentRcu = tableInfo.table().provisionedThroughput().readCapacityUnits();
long currentWcu = tableInfo.table().provisionedThroughput().writeCapacityUnits();
long newRcu = (long)(currentRcu * 1.5);
long newWcu = (long)(currentWcu * 1.5);
dynamo.updateTable(UpdateTableRequest.builder()
.tableName(tableName)
.provisionedThroughput(ProvisionedThroughput.builder()
.readCapacityUnits(newRcu)
.writeCapacityUnits(newWcu)
.build())
.build());
String action = String.format("Scaled %s: RCU %d→%d, WCU %d→%d",
tableName, currentRcu, newRcu, currentWcu, newWcu);
return Map.of("action", action, "status", "remediated");
}
}
Systems Manager Automation Runbooks
For more complex remediation that requires multiple steps with approval gates:
# SSM Automation Document (YAML format, defined here as dict for illustration)
restart_service_runbook = {
'description': 'Restart unhealthy ECS service with safety checks',
'schemaVersion': '0.3',
'assumeRole': '{{ AutomationAssumeRole }}',
'parameters': {
'ClusterName': {'type': 'String'},
'ServiceName': {'type': 'String'},
'AutomationAssumeRole': {'type': 'String'}
},
'mainSteps': [
{
'name': 'CheckServiceHealth',
'action': 'aws:executeScript',
'inputs': {
'Runtime': 'python3.11',
'Handler': 'check_health',
'Script': '''
def check_health(events, context):
import boto3
ecs = boto3.client("ecs")
svc = ecs.describe_services(
cluster=events["ClusterName"],
services=[events["ServiceName"]]
)["services"][0]
return {
"running": svc["runningCount"],
"desired": svc["desiredCount"],
"healthy_ratio": svc["runningCount"] / max(svc["desiredCount"], 1)
}
''',
'InputPayload': {
'ClusterName': '{{ ClusterName }}',
'ServiceName': '{{ ServiceName }}'
}
},
'outputs': [
{'Name': 'HealthyRatio', 'Selector': '$.Payload.healthy_ratio', 'Type': 'String'}
]
},
{
'name': 'ApprovalGate',
'action': 'aws:approve',
'onFailure': 'Abort',
'inputs': {
'Approvers': ['arn:aws:iam::123456789012:role/SRE-Team'],
'Message': 'Service {{ ServiceName }} is unhealthy. Approve restart?',
'MinRequiredApprovals': 1,
'NotificationArn': 'arn:aws:sns:us-east-1:123456789012:sre-approvals'
}
},
{
'name': 'ForceNewDeployment',
'action': 'aws:executeAwsApi',
'inputs': {
'Service': 'ecs',
'Api': 'UpdateService',
'cluster': '{{ ClusterName }}',
'service': '{{ ServiceName }}',
'forceNewDeployment': True
}
},
{
'name': 'WaitForStabilization',
'action': 'aws:waitForAwsResourceProperty',
'timeoutSeconds': 600,
'inputs': {
'Service': 'ecs',
'Api': 'DescribeServices',
'cluster': '{{ ClusterName }}',
'services': ['{{ ServiceName }}'],
'PropertySelector': '$.services[0].deployments[0].rolloutState',
'DesiredValues': ['COMPLETED']
}
}
]
}
Circuit Breaker Pattern
Prevent cascading failures by automatically stopping requests to a failing dependency:
import time
import threading
from enum import Enum
from dataclasses import dataclass, field
class CircuitState(Enum):
CLOSED = 'closed' # Normal operation
OPEN = 'open' # Failing, reject immediately
HALF_OPEN = 'half_open' # Testing if recovered
@dataclass
class CircuitBreaker:
"""
Circuit breaker for AWS service calls.
Prevents thundering herd against a failing dependency.
"""
name: str
failure_threshold: int = 5
recovery_timeout: int = 30 # seconds
success_threshold: int = 3 # successes needed to close from half-open
_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_failure_count: int = field(default=0, init=False)
_success_count: int = field(default=0, init=False)
_last_failure_time: float = field(default=0, init=False)
_lock: threading.Lock = field(default_factory=threading.Lock, init=False)
@property
def state(self) -> CircuitState:
with self._lock:
if self._state == CircuitState.OPEN:
if time.time() - self._last_failure_time > self.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._success_count = 0
return self._state
def call(self, func, *args, **kwargs):
"""Execute function through circuit breaker."""
current_state = self.state
if current_state == CircuitState.OPEN:
raise CircuitOpenError(
f"Circuit {self.name} is OPEN. Last failure: "
f"{time.time() - self._last_failure_time:.0f}s ago"
)
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
with self._lock:
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.success_threshold:
self._state = CircuitState.CLOSED
self._failure_count = 0
else:
self._failure_count = 0
def _on_failure(self):
with self._lock:
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.OPEN
# Usage
payment_circuit = CircuitBreaker(name='payment-gateway', failure_threshold=3, recovery_timeout=60)
def process_payment(order):
try:
return payment_circuit.call(call_payment_api, order)
except CircuitOpenError:
# Fallback: queue for later processing
enqueue_for_retry(order)
return {'status': 'queued', 'message': 'Payment service temporarily unavailable'}
Incident Timeline Reconstruction
When the dust settles, reconstruct what happened:
def reconstruct_incident_timeline(start_time, end_time, resource_arns):
"""
Build a complete incident timeline from CloudTrail + CloudWatch.
"""
cloudtrail = boto3.client('cloudtrail')
cloudwatch = boto3.client('cloudwatch')
timeline = []
# 1. Get all API calls on affected resources
events = cloudtrail.lookup_events(
StartTime=start_time,
EndTime=end_time,
LookupAttributes=[
{'AttributeKey': 'ResourceName', 'AttributeValue': arn}
for arn in resource_arns[:1] # One at a time
]
)
for event in events.get('Events', []):
timeline.append({
'time': event['EventTime'],
'type': 'api_call',
'action': event['EventName'],
'user': event.get('Username', 'unknown'),
'source': event.get('EventSource'),
'details': event.get('CloudTrailEvent', '')
})
# 2. Get alarm state changes
history = cloudwatch.describe_alarm_history(
AlarmNamePrefix='', # All alarms
HistoryItemType='StateUpdate',
StartDate=start_time,
EndDate=end_time
)
for item in history.get('AlarmHistoryItems', []):
timeline.append({
'time': item['Timestamp'],
'type': 'alarm_state_change',
'alarm': item['AlarmName'],
'details': item['HistorySummary']
})
# Sort by time
timeline.sort(key=lambda x: x['time'])
print("\n📋 INCIDENT TIMELINE")
print("=" * 80)
for entry in timeline:
emoji = {'api_call': '🔧', 'alarm_state_change': '🚨'}.get(entry['type'], '📝')
print(f" {entry['time'].strftime('%H:%M:%S')} {emoji} [{entry['type']}] "
f"{entry.get('action', entry.get('alarm', ''))}")
if entry.get('user'):
print(f"{'':>20} by: {entry['user']}")
return timeline
The automation maturity ladder:
- Alert → human investigates → human remediates
- Alert → automated diagnosis → human remediates
- Alert → automated remediation → human reviews
- Anomaly detection → preemptive remediation → no human involvement for known failures