Skip to main content
aws in the trenches advanced cloud engineering for senior developers

Step Functions: Orchestrating Complex Workflows

5 min read Chapter 9 of 21

Step Functions: Orchestrating Complex Workflows

Step Functions is AWS’s answer to the invisible choreography problem: when your business process spans 8 services, 3 approval gates, and handles 12 different failure modes, encoding that logic in event reactions scattered across services becomes unmaintainable. Step Functions makes the workflow visible as a state machine — debuggable, modifiable, and testable in one artifact.

Standard vs Express: Choose Your Tradeoffs

AspectStandardExpress (Synchronous & Async)
Max duration1 year5 minutes
PricingPer state transition (~$0.025/1000)Per execution + duration
Execution historyFull, queryable, 90 daysCloudWatch Logs only
Max throughput2,000 starts/sec100,000 starts/sec
IdempotencyBuilt-in (execution name)Not guaranteed
Use caseLong-running business workflowsHigh-volume data processing

Step Functions Orchestration

Real-World Workflow: Order Fulfillment

import json

# Step Functions state machine definition (Amazon States Language)
order_fulfillment_definition = {
    "Comment": "Order fulfillment with payment, inventory, and shipping",
    "StartAt": "ValidateOrder",
    "States": {
        "ValidateOrder": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-east-1:123456789012:function:validate-order",
            "Next": "ProcessPayment",
            "Retry": [{
                "ErrorEquals": ["ServiceUnavailableError"],
                "IntervalSeconds": 2,
                "MaxAttempts": 3,
                "BackoffRate": 2.0
            }],
            "Catch": [{
                "ErrorEquals": ["ValidationError"],
                "Next": "OrderRejected",
                "ResultPath": "$.error"
            }]
        },
        "ProcessPayment": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-east-1:123456789012:function:process-payment",
            "Next": "ParallelFulfillment",
            "Retry": [{
                "ErrorEquals": ["PaymentGatewayTimeout"],
                "IntervalSeconds": 5,
                "MaxAttempts": 2,
                "BackoffRate": 3.0
            }],
            "Catch": [{
                "ErrorEquals": ["InsufficientFundsError"],
                "Next": "NotifyPaymentFailed",
                "ResultPath": "$.error"
            }, {
                "ErrorEquals": ["States.ALL"],
                "Next": "ManualReview",
                "ResultPath": "$.error"
            }]
        },
        "ParallelFulfillment": {
            "Type": "Parallel",
            "Branches": [
                {
                    "StartAt": "ReserveInventory",
                    "States": {
                        "ReserveInventory": {
                            "Type": "Task",
                            "Resource": "arn:aws:lambda:us-east-1:123456789012:function:reserve-inventory",
                            "End": True
                        }
                    }
                },
                {
                    "StartAt": "SendConfirmationEmail",
                    "States": {
                        "SendConfirmationEmail": {
                            "Type": "Task",
                            "Resource": "arn:aws:lambda:us-east-1:123456789012:function:send-email",
                            "End": True
                        }
                    }
                },
                {
                    "StartAt": "UpdateAnalytics",
                    "States": {
                        "UpdateAnalytics": {
                            "Type": "Task",
                            "Resource": "arn:aws:lambda:us-east-1:123456789012:function:update-analytics",
                            "End": True
                        }
                    }
                }
            ],
            "Next": "ScheduleShipment",
            "Catch": [{
                "ErrorEquals": ["States.ALL"],
                "Next": "RollbackPayment",
                "ResultPath": "$.error"
            }]
        },
        "ScheduleShipment": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-east-1:123456789012:function:schedule-shipment",
            "Next": "OrderCompleted"
        },
        "OrderCompleted": {
            "Type": "Succeed"
        },
        "OrderRejected": {
            "Type": "Fail",
            "Cause": "Order validation failed",
            "Error": "ValidationError"
        },
        "NotifyPaymentFailed": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-east-1:123456789012:function:notify-payment-failed",
            "Next": "OrderRejected"
        },
        "RollbackPayment": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-east-1:123456789012:function:rollback-payment",
            "Next": "OrderRejected"
        },
        "ManualReview": {
            "Type": "Task",
            "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
            "Parameters": {
                "QueueUrl": "https://sqs.us-east-1.amazonaws.com/123456789012/manual-review",
                "MessageBody": {
                    "taskToken.$": "$$.Task.Token",
                    "orderId.$": "$.order_id",
                    "error.$": "$.error"
                }
            },
            "TimeoutSeconds": 86400,
            "Next": "ProcessPayment"
        }
    }
}
// Java: Starting a Step Functions execution and sending task tokens
import software.amazon.awssdk.services.sfn.SfnClient;
import software.amazon.awssdk.services.sfn.model.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;

public class StepFunctionsOrchestrator {

    private final SfnClient sfn = SfnClient.create();
    private final ObjectMapper mapper = new ObjectMapper();

    public String startOrderFulfillment(Map<String, Object> order) throws Exception {
        StartExecutionResponse response = sfn.startExecution(StartExecutionRequest.builder()
            .stateMachineArn("arn:aws:states:us-east-1:123456789012:stateMachine:order-fulfillment")
            .name("order-" + order.get("order_id"))  // Idempotency key
            .input(mapper.writeValueAsString(order))
            .build());

        return response.executionArn();
    }

    // Called by human reviewer after investigating the order
    public void approveManualReview(String taskToken, Map<String, Object> reviewResult)
            throws Exception {
        sfn.sendTaskSuccess(SendTaskSuccessRequest.builder()
            .taskToken(taskToken)
            .output(mapper.writeValueAsString(reviewResult))
            .build());
    }

    public void rejectManualReview(String taskToken, String reason) {
        sfn.sendTaskFailure(SendTaskFailureRequest.builder()
            .taskToken(taskToken)
            .error("ReviewRejected")
            .cause(reason)
            .build());
    }
}

Map State: Processing Collections

The Map state iterates over an array, executing a sub-workflow for each element — either inline or as a distributed (parallel) map:

# Distributed Map: Process 10,000 items in parallel (up to 10,000 concurrent)
distributed_map_definition = {
    "StartAt": "ProcessAllOrders",
    "States": {
        "ProcessAllOrders": {
            "Type": "Map",
            "ItemProcessor": {
                "ProcessorConfig": {
                    "Mode": "DISTRIBUTED",
                    "ExecutionType": "EXPRESS"
                },
                "StartAt": "ProcessSingleOrder",
                "States": {
                    "ProcessSingleOrder": {
                        "Type": "Task",
                        "Resource": "arn:aws:lambda:us-east-1:123456789012:function:process-single",
                        "End": True
                    }
                }
            },
            "ItemReader": {
                "Resource": "arn:aws:states:::s3:getObject",
                "ReaderConfig": {
                    "InputType": "JSON",
                    "MaxItems": 10000
                },
                "Parameters": {
                    "Bucket": "batch-processing",
                    "Key": "daily-orders.json"
                }
            },
            "MaxConcurrency": 1000,
            "ToleratedFailurePercentage": 5,
            "ResultWriter": {
                "Resource": "arn:aws:states:::s3:putObject",
                "Parameters": {
                    "Bucket": "batch-processing",
                    "Prefix": "results/"
                }
            },
            "End": True
        }
    }
}

Callback Pattern: Human-in-the-Loop

The .waitForTaskToken integration pauses the execution until an external system calls SendTaskSuccess or SendTaskFailure. This is how you integrate human approvals, external webhooks, or any async process that doesn’t fit the Lambda invocation model:

import boto3
import json

sfn = boto3.client('stepfunctions')

# When a human approves via your internal tool/UI:
def handle_approval_webhook(request):
    """Called by your internal approval UI."""
    task_token = request['task_token']
    approved = request['approved']
    reviewer = request['reviewer_email']

    if approved:
        sfn.send_task_success(
            taskToken=task_token,
            output=json.dumps({
                'approved': True,
                'reviewer': reviewer,
                'approved_at': '2024-01-15T10:30:00Z'
            })
        )
    else:
        sfn.send_task_failure(
            taskToken=task_token,
            error='ApprovalRejected',
            cause=f'Rejected by {reviewer}: {request.get("reason", "No reason given")}'
        )

# Important: Task tokens are valid for 1 year (Standard) or 5 min (Express)
# Store them durably (DynamoDB) if your approval process takes hours/days

Cost Optimization

Step Functions Standard charges per state transition. A 10-state workflow processing 1M orders/month = 10M transitions = $250/month. Strategies to reduce cost:

  1. Batch items before starting executions: Process 100 orders per execution instead of 1
  2. Use Express for high-volume, short-lived workflows: Charges per execution + duration, not transitions
  3. Combine multiple operations into single Lambda tasks: Reduce total state count
  4. Use SDK integrations directly (.sync) instead of wrapping in Lambda: arn:aws:states:::dynamodb:putItem costs 1 transition vs Lambda invoke + DynamoDB call = 2+ transitions
# Direct SDK integration — no Lambda needed for simple AWS API calls
direct_dynamodb_state = {
    "Type": "Task",
    "Resource": "arn:aws:states:::dynamodb:putItem",
    "Parameters": {
        "TableName": "order-status",
        "Item": {
            "pk": {"S.$": "$.order_id"},
            "sk": {"S": "STATUS"},
            "status": {"S": "COMPLETED"},
            "completed_at": {"S.$": "$$.State.EnteredTime"}
        }
    },
    "Next": "Done"
}
# This costs 1 state transition and directly calls DynamoDB
# No Lambda cold start, no Lambda duration cost, no Lambda code to maintain