Skip to main content
aws in the trenches advanced cloud engineering for senior developers

Infrastructure as Code: Advanced CDK Patterns

8 min read Chapter 19 of 21

Infrastructure as Code: Advanced CDK Patterns

AWS CDK lets you define infrastructure using real programming languages instead of JSON/YAML templates. This means loops, conditionals, inheritance, composition, and testability — all the tools you use for application code, applied to infrastructure. But most CDK code in the wild is terrible: God stacks with 2,000 lines, no abstraction, no testing, and deployment pipelines that take 45 minutes because everything is coupled.

This chapter shows how to write CDK code that scales with your organization.

The Construct Model

CDK has three levels of constructs:

CDK Construct Levels

  • L1 (Cfn*): Raw CloudFormation resources. 1:1 mapping. You set every property manually.
  • L2 (aws-*): Opinionated defaults with sensible APIs. The sweet spot for most work.
  • L3 (Patterns): Multi-resource abstractions. An ApplicationLoadBalancedFargateService creates ALB + ECS Service + Task Definition + Security Groups + CloudWatch Logs in one construct.

The power of CDK is building your own L3 constructs that encode your organization’s standards:

from aws_cdk import (
    Stack, Duration, RemovalPolicy, Tags,
    aws_dynamodb as dynamodb,
    aws_lambda as lambda_,
    aws_sqs as sqs,
    aws_iam as iam,
    aws_cloudwatch as cloudwatch,
    aws_cloudwatch_actions as cw_actions,
    aws_sns as sns,
)
from constructs import Construct

class MonitoredDynamoDBTable(Construct):
    """
    L3 Construct: DynamoDB table with built-in monitoring, alarms, and backup.
    Encodes our organization's standards:
    - Point-in-time recovery always enabled
    - Deletion protection on by default
    - Standard alarms for throttling and errors
    - Auto-tagging with cost center
    """

    def __init__(self, scope: Construct, id: str, *,
                 table_name: str,
                 partition_key: dynamodb.Attribute,
                 sort_key: dynamodb.Attribute = None,
                 billing_mode: dynamodb.BillingMode = dynamodb.BillingMode.PAY_PER_REQUEST,
                 alarm_topic: sns.ITopic,
                 cost_center: str,
                 enable_streams: bool = False):
        super().__init__(scope, id)

        # Table with org standards baked in
        self.table = dynamodb.Table(self, 'Table',
            table_name=table_name,
            partition_key=partition_key,
            sort_key=sort_key,
            billing_mode=billing_mode,
            point_in_time_recovery=True,         # Always
            deletion_protection=True,             # Always for prod
            removal_policy=RemovalPolicy.RETAIN,  # Never delete data on stack deletion
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES if enable_streams else None,
        )

        # Standard alarms
        throttle_alarm = self.table.metric_throttled_requests_for_operation(
            'PutItem', period=Duration.minutes(1)
        ).create_alarm(self, 'ThrottleAlarm',
            alarm_name=f'{table_name}-throttling',
            threshold=5,
            evaluation_periods=3,
            datapoints_to_alarm=2,
            treat_missing_data=cloudwatch.TreatMissingData.NOT_BREACHING
        )
        throttle_alarm.add_alarm_action(cw_actions.SnsAction(alarm_topic))

        error_alarm = self.table.metric_system_errors_for_operations(
            period=Duration.minutes(1)
        ).create_alarm(self, 'ErrorAlarm',
            alarm_name=f'{table_name}-system-errors',
            threshold=1,
            evaluation_periods=2,
            treat_missing_data=cloudwatch.TreatMissingData.NOT_BREACHING
        )
        error_alarm.add_alarm_action(cw_actions.SnsAction(alarm_topic))

        # Cost tags
        Tags.of(self.table).add('cost-center', cost_center)
        Tags.of(self.table).add('managed-by', 'cdk')

    def grant_read_write(self, grantee: iam.IGrantable):
        """Delegate with standard audit logging."""
        self.table.grant_read_write_data(grantee)

    @property
    def table_arn(self) -> str:
        return self.table.table_arn

    @property
    def table_name_output(self) -> str:
        return self.table.table_name


# Usage: One line creates a fully-monitored, compliant table
class MyStack(Stack):
    def __init__(self, scope, id, **kwargs):
        super().__init__(scope, id, **kwargs)

        ops_topic = sns.Topic(self, 'OpsTopic')

        orders_table = MonitoredDynamoDBTable(self, 'OrdersTable',
            table_name='orders',
            partition_key=dynamodb.Attribute(name='pk', type=dynamodb.AttributeType.STRING),
            sort_key=dynamodb.Attribute(name='sk', type=dynamodb.AttributeType.STRING),
            alarm_topic=ops_topic,
            cost_center='order-platform',
            enable_streams=True
        )
package com.mycompany.constructs;

import software.constructs.Construct;
import software.amazon.awscdk.*;
import software.amazon.awscdk.services.dynamodb.*;
import software.amazon.awscdk.services.cloudwatch.*;
import software.amazon.awscdk.services.cloudwatch.actions.*;
import software.amazon.awscdk.services.sns.*;
import software.amazon.awscdk.services.iam.*;
import java.util.Map;

public class MonitoredDynamoDBTable extends Construct {

    private final Table table;

    public MonitoredDynamoDBTable(Construct scope, String id, MonitoredTableProps props) {
        super(scope, id);

        this.table = Table.Builder.create(this, "Table")
            .tableName(props.getTableName())
            .partitionKey(props.getPartitionKey())
            .sortKey(props.getSortKey())
            .billingMode(BillingMode.PAY_PER_REQUEST)
            .pointInTimeRecovery(true)
            .deletionProtection(true)
            .removalPolicy(RemovalPolicy.RETAIN)
            .stream(props.isEnableStreams()
                ? StreamViewType.NEW_AND_OLD_IMAGES : null)
            .build();

        // Throttling alarm
        Alarm throttleAlarm = this.table.metricThrottledRequestsForOperation(
            "PutItem", MetricOptions.builder().period(Duration.minutes(1)).build()
        ).createAlarm(this, "ThrottleAlarm", CreateAlarmOptions.builder()
            .alarmName(props.getTableName() + "-throttling")
            .threshold(5)
            .evaluationPeriods(3)
            .datapointsToAlarm(2)
            .treatMissingData(TreatMissingData.NOT_BREACHING)
            .build());
        throttleAlarm.addAlarmAction(new SnsAction(props.getAlarmTopic()));

        // Cost tagging
        Tags.of(this.table).add("cost-center", props.getCostCenter());
        Tags.of(this.table).add("managed-by", "cdk");
    }

    public Table getTable() { return this.table; }

    public void grantReadWrite(IGrantable grantee) {
        this.table.grantReadWriteData(grantee);
    }
}

Aspects: Cross-Cutting Concerns

Aspects visit every construct in the tree and can inspect or modify them. Use them for organization-wide policies:

from aws_cdk import IAspect, Annotations
import jsii

@jsii.implements(IAspect)
class SecurityComplianceAspect:
    """
    Aspect that enforces security standards across all resources.
    Applied at the App level, affects every stack.
    """

    def visit(self, node):
        # Enforce encryption on S3 buckets
        if isinstance(node, aws_s3.CfnBucket):
            encryption = node.bucket_encryption
            if not encryption:
                Annotations.of(node).add_error(
                    'S3 bucket must have encryption enabled. '
                    'Use BucketEncryption.S3_MANAGED or KMS.'
                )

        # Enforce encryption at rest on DynamoDB
        if isinstance(node, dynamodb.CfnTable):
            sse = node.sse_specification
            if not sse or not sse.sse_enabled:
                Annotations.of(node).add_warning(
                    'DynamoDB table should use KMS encryption for compliance.'
                )

        # Block public Lambda function URLs
        if isinstance(node, lambda_.CfnUrl):
            if node.auth_type == 'NONE':
                Annotations.of(node).add_error(
                    'Lambda function URLs with AuthType=NONE are forbidden. '
                    'Use AWS_IAM auth type.'
                )

        # Ensure all Lambda functions have tracing enabled
        if isinstance(node, lambda_.CfnFunction):
            tracing = node.tracing_config
            if not tracing or tracing.get('mode') != 'Active':
                # Auto-fix: enable tracing
                node.add_property_override('TracingConfig.Mode', 'Active')


@jsii.implements(IAspect)
class CostTaggingAspect:
    """Add cost allocation tags to all taggable resources."""

    def __init__(self, team: str, environment: str, project: str):
        self.tags = {
            'team': team,
            'environment': environment,
            'project': project,
            'managed-by': 'cdk'
        }

    def visit(self, node):
        if hasattr(node, 'tags') or hasattr(node, 'cfn_options'):
            for key, value in self.tags.items():
                Tags.of(node).add(key, value)


# Apply aspects at the app level
app = cdk.App()
stack = MyStack(app, 'Production')

# These affect EVERY resource in the app
Aspects.of(app).add(SecurityComplianceAspect())
Aspects.of(app).add(CostTaggingAspect(
    team='platform',
    environment='prod',
    project='order-system'
))

app.synth()

Custom Resources: Filling CloudFormation’s Gaps

Custom Resources execute Lambda functions during stack create/update/delete, enabling operations CloudFormation doesn’t natively support:

from aws_cdk import (
    CustomResource,
    custom_resources as cr,
    aws_lambda as lambda_,
)

class DynamoDBSeedData(Construct):
    """
    Custom Resource: Seed a DynamoDB table with initial data on stack creation.
    On delete: optionally clean up seed data.
    """

    def __init__(self, scope: Construct, id: str, *,
                 table: dynamodb.Table,
                 seed_data: list[dict]):
        super().__init__(scope, id)

        # Lambda that handles CREATE/UPDATE/DELETE
        handler = lambda_.Function(self, 'Handler',
            runtime=lambda_.Runtime.PYTHON_3_12,
            code=lambda_.Code.from_inline('''
import boto3
import cfnresponse
import json

def handler(event, context):
    try:
        table_name = event['ResourceProperties']['TableName']
        seed_data = json.loads(event['ResourceProperties']['SeedData'])
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table(table_name)

        if event['RequestType'] in ['Create', 'Update']:
            with table.batch_writer() as batch:
                for item in seed_data:
                    batch.put_item(Item=item)

            cfnresponse.send(event, context, cfnresponse.SUCCESS,
                           {'ItemCount': str(len(seed_data))})

        elif event['RequestType'] == 'Delete':
            # Optionally clean up seed data
            cfnresponse.send(event, context, cfnresponse.SUCCESS, {})

    except Exception as e:
        cfnresponse.send(event, context, cfnresponse.FAILED,
                        {'Error': str(e)})
'''),
            handler='index.handler',
            timeout=Duration.minutes(5)
        )

        table.grant_read_write_data(handler)

        # The custom resource triggers the Lambda
        CustomResource(self, 'SeedResource',
            service_token=handler.function_arn,
            properties={
                'TableName': table.table_name,
                'SeedData': json.dumps(seed_data),
                # Change this to force re-seeding on stack update
                'Version': '1'
            }
        )

Testing CDK Infrastructure

CDK code is testable with unit tests (fast, no deployment) and integration tests (deploy + verify + destroy):

import pytest
from aws_cdk import App, Stack
from aws_cdk.assertions import Template, Match

def test_monitored_table_creates_alarms():
    """Unit test: Verify the construct creates expected resources."""
    app = App()
    stack = Stack(app, 'TestStack')

    topic = sns.Topic(stack, 'Topic')

    MonitoredDynamoDBTable(stack, 'TestTable',
        table_name='test-orders',
        partition_key=dynamodb.Attribute(name='pk', type=dynamodb.AttributeType.STRING),
        alarm_topic=topic,
        cost_center='test'
    )

    template = Template.from_stack(stack)

    # Verify DynamoDB table has PITR enabled
    template.has_resource_properties('AWS::DynamoDB::Table', {
        'PointInTimeRecoverySpecification': {
            'PointInTimeRecoveryEnabled': True
        },
        'DeletionProtectionEnabled': True
    })

    # Verify at least 2 alarms are created
    template.resource_count_is('AWS::CloudWatch::Alarm', 2)

    # Verify alarm threshold
    template.has_resource_properties('AWS::CloudWatch::Alarm', {
        'Threshold': 5,
        'EvaluationPeriods': 3,
        'DatapointsToAlarm': 2
    })

    # Verify tags
    template.has_resource_properties('AWS::DynamoDB::Table', {
        'Tags': Match.array_with([
            Match.object_like({'Key': 'cost-center', 'Value': 'test'}),
            Match.object_like({'Key': 'managed-by', 'Value': 'cdk'})
        ])
    })

def test_security_aspect_blocks_unencrypted_buckets():
    """Test that the security aspect catches violations."""
    app = App()
    stack = Stack(app, 'TestStack')

    # Create a non-compliant bucket
    aws_s3.Bucket(stack, 'BadBucket',
        encryption=aws_s3.BucketEncryption.UNENCRYPTED
    )

    Aspects.of(app).add(SecurityComplianceAspect())

    # Synthesize and check for errors
    assembly = app.synth()
    # In CDK tests, annotations are accessible via the cloud assembly
    # Check that an error annotation was added
// Java CDK testing with assertions
import org.junit.jupiter.api.Test;
import software.amazon.awscdk.App;
import software.amazon.awscdk.Stack;
import software.amazon.awscdk.assertions.Template;
import software.amazon.awscdk.assertions.Match;
import java.util.Map;

class MonitoredDynamoDBTableTest {

    @Test
    void testCreatesTableWithPITR() {
        App app = new App();
        Stack stack = new Stack(app, "TestStack");

        Topic topic = new Topic(stack, "Topic");

        new MonitoredDynamoDBTable(stack, "TestTable", MonitoredTableProps.builder()
            .tableName("test-orders")
            .partitionKey(Attribute.builder().name("pk").type(AttributeType.STRING).build())
            .alarmTopic(topic)
            .costCenter("test")
            .build());

        Template template = Template.fromStack(stack);

        template.hasResourceProperties("AWS::DynamoDB::Table", Map.of(
            "PointInTimeRecoverySpecification", Map.of(
                "PointInTimeRecoveryEnabled", true
            ),
            "DeletionProtectionEnabled", true
        ));

        // Verify alarm count
        template.resourceCountIs("AWS::CloudWatch::Alarm", 2);
    }

    @Test
    void testTableHasCostTags() {
        App app = new App();
        Stack stack = new Stack(app, "TestStack");

        // ... setup ...

        Template template = Template.fromStack(stack);
        template.hasResourceProperties("AWS::DynamoDB::Table", Map.of(
            "Tags", Match.arrayWith(java.util.List.of(
                Match.objectLike(Map.of("Key", "cost-center", "Value", "test"))
            ))
        ));
    }
}

Multi-Stack Architecture

Split your CDK app into multiple stacks for independent deployability:

class NetworkStack(Stack):
    """VPC, subnets, security groups. Rarely changes."""
    def __init__(self, scope, id, **kwargs):
        super().__init__(scope, id, **kwargs)
        self.vpc = ec2.Vpc(self, 'VPC', max_azs=3)
        self.db_security_group = ec2.SecurityGroup(self, 'DbSg', vpc=self.vpc)

class DataStack(Stack):
    """DynamoDB tables, RDS instances. Changes need careful review."""
    def __init__(self, scope, id, *, vpc, **kwargs):
        super().__init__(scope, id, **kwargs)
        self.orders_table = MonitoredDynamoDBTable(self, 'Orders', ...)

class ServiceStack(Stack):
    """Lambda functions, API Gateway. Changes frequently, deploys fast."""
    def __init__(self, scope, id, *, table, vpc, **kwargs):
        super().__init__(scope, id, **kwargs)
        # References resources from other stacks (cross-stack references)
        fn = lambda_.Function(self, 'Handler', ...)
        table.grant_read_write(fn)

# App composition
app = cdk.App()
network = NetworkStack(app, 'Network', env=prod_env)
data = DataStack(app, 'Data', vpc=network.vpc, env=prod_env)
service = ServiceStack(app, 'Service', table=data.orders_table, vpc=network.vpc, env=prod_env)

# Deploy independently: cdk deploy Service (only deploys service stack)
# Cross-stack references use CloudFormation exports automatically

CDK anti-patterns to avoid:

  1. One mega-stack with everything (deploy takes 30+ minutes, blast radius is the entire infra)
  2. Using CfnOutput for everything instead of passing construct references
  3. Hardcoding account IDs and regions (use cdk.Environment and context)
  4. Not testing CDK code (treat it like application code — because it is)