Introduction
There are many ways to organize your serverless functions. Technically, their name is self-explanatory - they are functions. Provider-agnostic, they are just pieces of code that do something upon execution and can either have a payload or not. However, when you start building a slightly bigger system, you might want to organize them in a more structured way.
Idea Behind
When I first faced a Command/Action
pattern in one of my projects, I was a bit confused.
I mean this is a well-known pattern in software engineering, but I never thought about it in the context of serverless.
Pattern is well known in OOP world, but it can be applied in many other paradigms.
The idea is simple - you have a command that does something.
Some boilerplate code is shared between commands, so for a simple functions it might be an overkill (and overhead).
But as soon as you will have more than a few semantically related (but structurally different) events, you might want to consider this pattern.
Command Pattern
Take a look at this code:
from abc import ABC, abstractmethod
from typing import Dict
# Command interface
class Command(ABC):
@abstractmethod
def execute(self) -> bool:
pass
# Concrete commands
class CreateUserCommand(Command):
def __init__(self, event: dict):
self.user_data = event
def execute(self) -> bool:
print(f"Creating user with data: {self.user_data}")
# Some actual user creation logic here
return True
class DeleteUserCommand(Command):
def __init__(self, event: dict):
self.user_id = event["user_id"]
def execute(self) -> bool:
print(f"Deleting user with ID: {self.user_id}")
# Some actual user deletion logic here
return True
# Event handler that uses commands
class EventHandler:
def __init__(self):
self.command_map: Dict[str, Command] = {}
def register_command(self, event_type: str, command: Command):
self.command_map[event_type] = command
def handle_event(self, event_type: str) -> bool:
if event_type not in self.command_map:
raise ValueError(f"No command registered for event: {event_type}")
command = self.command_map[event_type]
return command.execute()
# Usage example
if __name__ == "__main__":
# Create event handler
handler = EventHandler()
# Register commands for different events
handler.register_command("user_created", CreateUserCommand({"name": "John", "email": "[email protected]"}))
handler.register_command("user_deleted", DeleteUserCommand("user123"))
# Handle events
try:
handler.handle_event("user_created") # This will execute CreateUserCommand
handler.handle_event("user_deleted") # This will execute DeleteUserCommand
except ValueError as e:
print(f"Error: {e}")
Indeed, it looks like a basic OOP example. It starts shining when you have more than a few commands and when you are using a single entry point to handle all events.
This is especially useful in serverless environments where you might have many functions that are triggered by different events. Or in microservices orchestration where you want to have a single entry point to handle all events.
Additionally, it allows to abstract the actual logic from the event handling logic.
You can implement UserCreatedCommand
and UserDeletedCommand
in a way that is completely independent of the event handling logic.
For example, you can inject a test command in your tests to verify that the event handling logic works correctly.
Yes, there are many other ways to do this, many of them looks similar.
Serverless Command Pattern
Same or similar approach is really useful when you use a defined interfaces for your services (think gRPC
or Avro
etc.).
Commands can be easily swapped to test different implementations unless consumer
of the command has not changed or have a strict contract.
It also a good approach in a state machine or workflow engine, where you can define a set of commands that can be executed in a specific order.
The pattern fits naturally because:
- Each state transition can be represented as a command
- Commands can be validated before execution
- Easy to track and monitor state changes
- Supports rollback/compensation actions
Here’s a practical example combining both concepts:
import json
from abc import ABC, abstractmethod
from typing import Dict, Any
from datetime import datetime
import boto3
from aws_lambda_powertools import Logger
logger = Logger()
# Base Command
class Command(ABC):
@abstractmethod
async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
pass
@abstractmethod
def validate(self, payload: Dict[str, Any]) -> bool:
pass
# Concrete Commands
class ProcessOrderCommand(Command):
def __init__(self):
self.dynamodb = boto3.resource('dynamodb')
self.sqs = boto3.client('sqs')
def validate(self, payload: Dict[str, Any]) -> bool:
required_fields = ['orderId', 'userId', 'amount']
return all(field in payload for field in required_fields)
async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
order_data = context['payload']
# Store order in DynamoDB
table = self.dynamodb.Table('Orders')
order_data['status'] = 'PROCESSING'
order_data['timestamp'] = datetime.utcnow().isoformat()
await table.put_item(Item=order_data)
# Send message to payment processing queue
await self.sqs.send_message(
QueueUrl='payment-processing-queue-url',
MessageBody=json.dumps(order_data)
)
return {
'statusCode': 200,
'body': {'orderId': order_data['orderId'], 'status': 'PROCESSING'}
}
class RefundOrderCommand(Command):
def __init__(self):
self.dynamodb = boto3.resource('dynamodb')
self.sns = boto3.client('sns')
def validate(self, payload: Dict[str, Any]) -> bool:
required_fields = ['orderId', 'refundAmount']
return all(field in payload for field in required_fields)
async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
refund_data = context['payload']
# Update order status
table = self.dynamodb.Table('Orders')
await table.update_item(
Key={'orderId': refund_data['orderId']},
UpdateExpression='SET status = :status, refundAmount = :amount',
ExpressionAttributeValues={
':status': 'REFUNDED',
':amount': refund_data['refundAmount']
}
)
# Notify relevant services
await self.sns.publish(
TopicArn='refund-notification-topic',
Message=json.dumps(refund_data)
)
return {
'statusCode': 200,
'body': {'orderId': refund_data['orderId'], 'status': 'REFUNDED'}
}
# Command Factory
class CommandFactory:
_commands: Dict[str, Command] = {
'PROCESS_ORDER': ProcessOrderCommand(),
'REFUND_ORDER': RefundOrderCommand()
}
@classmethod
def get_command(cls, command_type: str) -> Command:
command = cls._commands.get(command_type)
if not command:
raise ValueError(f"Unknown command type: {command_type}")
return command
# Lambda Handler
@logger.inject_lambda_context
def handler(event, context):
try:
# Extract command type and payload from event
command_type = event['commandType']
payload = event['payload']
# Get appropriate command
command = CommandFactory.get_command(command_type)
# Validate payload
if not command.validate(payload):
return {
'statusCode': 400,
'body': 'Invalid payload for command'
}
# Execute command
result = command.execute({'payload': payload, 'context': context})
return result
except Exception as e:
logger.exception("Error processing command")
return {
'statusCode': 500,
'body': str(e)
}
I am using AWS
as an example here, but you can easily adapt it to any other provider or even run it locally.
The rest is on your state machine or workflow engine to handle the commands in a specific order. Each command can provide a response that can be used by the next command in the chain.
That’s it! I hope you find this pattern useful.