Skip to main content
aws in the trenches advanced cloud engineering for senior developers

Lambda Performance Tuning and Cost Optimization

6 min read Chapter 12 of 21

Lambda Performance Tuning and Cost Optimization

Lambda billing is per-millisecond of execution time multiplied by memory allocated. This creates a non-obvious optimization space: increasing memory might reduce duration enough to lower total cost. Decreasing memory saves per-ms cost but extends duration. The optimal point varies per function and must be measured empirically.

Power Tuning: Finding the Optimal Configuration

AWS Lambda Power Tuning is an open-source Step Functions state machine that invokes your function at different memory levels and reports the cost/performance tradeoff:

import boto3
import json
import time
from concurrent.futures import ThreadPoolExecutor
from statistics import median, mean

def power_tune(function_name: str, payload: dict, memory_levels: list = None,
               invocations_per_level: int = 50):
    """
    Manual power tuning: invoke at each memory level and measure.
    For production use, deploy the official Power Tuning tool.
    """
    if memory_levels is None:
        memory_levels = [128, 256, 512, 1024, 1536, 2048, 3008]

    lambda_client = boto3.client('lambda')
    results = {}

    for memory in memory_levels:
        # Update function memory
        lambda_client.update_function_configuration(
            FunctionName=function_name,
            MemorySize=memory
        )

        # Wait for update to propagate
        waiter = lambda_client.get_waiter('function_updated_v2')
        waiter.wait(FunctionName=function_name)

        # Run cold start invocation (discard) to warm the new config
        lambda_client.invoke(
            FunctionName=function_name,
            Payload=json.dumps(payload)
        )

        # Measure warm invocations
        durations = []
        for _ in range(invocations_per_level):
            response = lambda_client.invoke(
                FunctionName=function_name,
                Payload=json.dumps(payload),
                LogType='Tail'
            )
            # Extract billed duration from log
            import base64
            log = base64.b64decode(response['LogResult']).decode()
            for line in log.split('\n'):
                if 'Billed Duration' in line:
                    billed = int(line.split('Billed Duration: ')[1].split(' ms')[0])
                    durations.append(billed)

        # Calculate cost per invocation at this memory level
        # Price: $0.0000166667 per GB-second (x86)
        cost_per_ms = (memory / 1024) * 0.0000000167
        avg_duration = mean(durations)
        p99_duration = sorted(durations)[int(len(durations) * 0.99)]
        cost = cost_per_ms * avg_duration

        results[memory] = {
            'avg_duration_ms': round(avg_duration, 1),
            'p99_duration_ms': p99_duration,
            'cost_per_invocation': round(cost, 8),
            'invocations_per_dollar': round(1 / cost) if cost > 0 else 0
        }

        print(f"{memory:>5} MB | avg: {avg_duration:>7.1f}ms | "
              f"p99: {p99_duration:>5}ms | cost: ${cost:.8f}")

    # Find optimal (lowest cost)
    optimal = min(results.items(), key=lambda x: x[1]['cost_per_invocation'])
    print(f"\nOptimal: {optimal[0]} MB (${optimal[1]['cost_per_invocation']:.8f}/invocation)")

    return results

Connection Pooling in Serverless

Traditional connection pooling doesn’t work in Lambda because each execution environment is isolated. You need connection management strategies:

import os
import psycopg2
from psycopg2 import pool
import boto3

# Problem: Each Lambda instance opens its own connection
# 1000 concurrent Lambda instances = 1000 database connections
# Most databases cap at 100-500 connections

# Solution 1: RDS Proxy (managed connection pooling)
# RDS Proxy sits between Lambda and your database, multiplexing connections
# Configure in Lambda: DATABASE_HOST = rds-proxy-endpoint (not direct RDS endpoint)

# Solution 2: Connection reuse within execution environment
_connection = None

def get_connection():
    """Reuse connection across invocations in the same execution environment."""
    global _connection
    if _connection is None or _connection.closed:
        _connection = psycopg2.connect(
            host=os.environ['DB_HOST'],
            dbname=os.environ['DB_NAME'],
            user=os.environ['DB_USER'],
            password=get_db_password(),
            connect_timeout=5,
            # Critical for Lambda: set statement timeout to avoid long-running queries
            # blocking the function past its timeout
            options='-c statement_timeout=25000'  # 25 seconds
        )
        _connection.autocommit = True
    return _connection

def handler(event, context):
    conn = get_connection()
    remaining_ms = context.get_remaining_time_in_millis()

    # Ensure we don't start a query if we don't have time to finish it
    if remaining_ms < 5000:
        return {'statusCode': 503, 'body': 'Insufficient time remaining'}

    with conn.cursor() as cur:
        cur.execute("SELECT * FROM orders WHERE customer_id = %s", (event['customer_id'],))
        results = cur.fetchall()

    return {'statusCode': 200, 'body': json.dumps(results, default=str)}
import software.amazon.awssdk.services.rdsdata.RdsDataClient;
import software.amazon.awssdk.services.rdsdata.model.*;
import java.util.*;

// Solution 3: RDS Data API (HTTP-based, no connection management needed)
// Tradeoff: Higher per-query latency (~30ms overhead) but zero connection issues
public class RdsDataApiHandler {

    // Data API client — no TCP connection to manage
    private static final RdsDataClient rdsData = RdsDataClient.create();
    private static final String CLUSTER_ARN = System.getenv("CLUSTER_ARN");
    private static final String SECRET_ARN = System.getenv("SECRET_ARN");
    private static final String DATABASE = System.getenv("DATABASE_NAME");

    public List<Map<String, String>> getOrders(String customerId) {
        ExecuteStatementResponse response = rdsData.executeStatement(
            ExecuteStatementRequest.builder()
                .resourceArn(CLUSTER_ARN)
                .secretArn(SECRET_ARN)
                .database(DATABASE)
                .sql("SELECT order_id, status, total FROM orders WHERE customer_id = :id")
                .parameters(SqlParameter.builder()
                    .name("id")
                    .value(Field.builder().stringValue(customerId).build())
                    .build())
                .build());

        // Parse columnar results
        List<Map<String, String>> results = new ArrayList<>();
        for (List<Field> row : response.records()) {
            results.add(Map.of(
                "order_id", row.get(0).stringValue(),
                "status", row.get(1).stringValue(),
                "total", row.get(2).stringValue()
            ));
        }
        return results;
    }
}

ARM64 (Graviton2) Migration

Lambda supports ARM64 (Graviton2) at 20% lower cost with generally better performance for compute workloads. Migration checklist:

# Check if your dependencies have ARM64 wheels
# Most pure Python packages work without changes
# Compiled packages (numpy, pandas, psycopg2) need ARM64 binaries

import subprocess
import json

def check_arm64_compatibility(requirements_file: str):
    """Check which packages have ARM64 wheels available on PyPI."""
    import requests

    with open(requirements_file) as f:
        packages = [line.strip().split('==')[0] for line in f if line.strip() and not line.startswith('#')]

    for package in packages:
        response = requests.get(f"https://pypi.org/pypi/{package}/json")
        if response.status_code != 200:
            print(f"  ❓ {package}: not found on PyPI")
            continue

        data = response.json()
        urls = data.get('urls', [])

        has_arm = any('aarch64' in url.get('filename', '') or
                      'none-any' in url.get('filename', '')
                      for url in urls)

        status = "✅" if has_arm else "❌"
        print(f"  {status} {package}: {'ARM64 available' if has_arm else 'x86_64 only'}")

# Deploying for ARM64:
lambda_client = boto3.client('lambda')
lambda_client.update_function_configuration(
    FunctionName='my-function',
    Architectures=['arm64']  # Change from default ['x86_64']
)
# Layers MUST also be compatible with arm64!
# Rebuild layers with: docker run --platform linux/arm64 ...

Batch Processing: Maximizing Throughput

For event-source mappings (SQS, Kinesis, DynamoDB Streams), batch configuration directly impacts throughput and cost:

# Lambda processes messages in batches — process the ENTIRE batch efficiently

def batch_handler(event, context):
    """
    Processing 100 SQS messages per invocation instead of 1.
    Cost: 1 invocation instead of 100.
    """
    # Batch DynamoDB writes (25 items max per batch)
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('processed-events')

    failed_message_ids = []
    items_to_write = []

    for record in event['Records']:
        try:
            body = json.loads(record['body'])
            processed = transform_message(body)
            items_to_write.append({
                'PutRequest': {
                    'Item': processed
                }
            })
        except Exception as e:
            failed_message_ids.append(record['messageId'])

    # Batch write (max 25 items per call)
    for i in range(0, len(items_to_write), 25):
        batch = items_to_write[i:i+25]
        response = dynamodb.meta.client.batch_write_item(
            RequestItems={'processed-events': batch}
        )

        # Handle unprocessed items (throttled)
        unprocessed = response.get('UnprocessedItems', {})
        while unprocessed:
            time.sleep(0.1)  # Brief backoff
            response = dynamodb.meta.client.batch_write_item(
                RequestItems=unprocessed
            )
            unprocessed = response.get('UnprocessedItems', {})

    # Report partial failures — only failed messages get retried
    return {
        'batchItemFailures': [
            {'itemIdentifier': msg_id} for msg_id in failed_message_ids
        ]
    }

# Event source mapping config for maximum throughput:
# BatchSize: 100 (SQS max: 10,000 for standard, 10 for FIFO)
# MaximumBatchingWindowInSeconds: 5 (wait to fill batch)
# FunctionResponseTypes: ['ReportBatchItemFailures']

Cost optimization summary:

  1. Power-tune every function (5 minutes of effort, often 30-50% cost reduction)
  2. Use ARM64 for all new functions (20% cheaper, equal or better performance)
  3. Maximize batch sizes for event-source-driven functions
  4. Use Provisioned Concurrency only for latency-critical synchronous paths
  5. Set memory to 128 MB for I/O-bound functions that just call other services