Middleware factory provides a decorator factory to create your own middleware to run logic before, and after each Lambda invocation synchronously.
Key features¶All examples shared in this documentation are available within the project repository.
You might need a custom middleware to abstract non-functional code. These are often custom authorization or any reusable logic you might need to run before/after a Lambda function invocation.
Middleware with no params¶You can create your own middleware using lambda_handler_decorator
. The decorator factory expects 3 arguments in your function signature:
getting_started_middleware_before_logic_function.pygetting_started_middleware_before_logic_payload.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
from dataclasses import dataclass, field
from typing import Callable
from uuid import uuid4
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
from aws_lambda_powertools.utilities.jmespath_utils import (
envelopes,
query,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
@dataclass
class Payment:
user_id: str
order_id: str
amount: float
status_id: str
payment_id: str = field(default_factory=lambda: f"{uuid4()}")
class PaymentError(Exception): ...
@lambda_handler_decorator
def middleware_before(
handler: Callable[[dict, LambdaContext], dict],
event: dict,
context: LambdaContext,
) -> dict:
# extract payload from a EventBridge event
detail: dict = query(data=event, envelope=envelopes.EVENTBRIDGE)
# check if status_id exists in payload, otherwise add default state before processing payment
if "status_id" not in detail:
event["detail"]["status_id"] = "pending"
return handler(event, context)
@middleware_before
def lambda_handler(event: dict, context: LambdaContext) -> dict:
try:
payment_payload: dict = query(data=event, envelope=envelopes.EVENTBRIDGE)
return {
"order": Payment(**payment_payload).__dict__,
"message": "payment created",
"success": True,
}
except Exception as e:
raise PaymentError("Unable to create payment") from e
1 2 3 4 5 6 7 8 9 10 11 12 13 14
{
"version": "0",
"id": "9c95e8e4-96a4-ef3f-b739-b6aa5b193afb",
"detail-type": "PaymentCreated",
"source": "app.payment",
"account": "0123456789012",
"time": "2022-08-08T20:41:53Z",
"region": "eu-east-1",
"detail": {
"amount": "150.00",
"order_id": "8f1f1710-1b30-48a5-a6bd-153fd23b866b",
"user_id": "f80e3c51-5b8c-49d5-af7d-c7804966235f"
}
}
Middleware with after logic¶
getting_started_middleware_after_logic_function.pygetting_started_middleware_after_logic_payload.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
import time
from typing import Callable
import requests
from requests import Response
from aws_lambda_powertools.event_handler import APIGatewayRestResolver
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
from aws_lambda_powertools.utilities.typing import LambdaContext
app = APIGatewayRestResolver()
@lambda_handler_decorator
def middleware_after(
handler: Callable[[dict, LambdaContext], dict],
event: dict,
context: LambdaContext,
) -> dict:
start_time = time.time()
response = handler(event, context)
execution_time = time.time() - start_time
# adding custom headers in response object after lambda executing
response["headers"]["execution_time"] = execution_time
response["headers"]["aws_request_id"] = context.aws_request_id
return response
@app.post("/todos")
def create_todo() -> dict:
todo_data: dict = app.current_event.json_body # deserialize json str to dict
todo: Response = requests.post("https://jsonplaceholder.typicode.com/todos", data=todo_data)
todo.raise_for_status()
return {"todo": todo.json()}
@middleware_after
def lambda_handler(event: dict, context: LambdaContext) -> dict:
return app.resolve(event, context)
{
"resource": "/todos",
"path": "/todos",
"httpMethod": "POST",
"body": "{\"title\": \"foo\", \"userId\": 1, \"completed\": false}"
}
Middleware with params¶
You can also have your own keyword arguments after the mandatory arguments.
getting_started_middleware_with_params_function.pygetting_started_middleware_with_params_payload.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
import base64
from dataclasses import dataclass, field
from typing import Any, Callable, List
from uuid import uuid4
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
from aws_lambda_powertools.utilities.jmespath_utils import (
envelopes,
query,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
@dataclass
class Booking:
days: int
date_from: str
date_to: str
hotel_id: int
country: str
city: str
guest: dict
booking_id: str = field(default_factory=lambda: f"{uuid4()}")
class BookingError(Exception): ...
@lambda_handler_decorator
def obfuscate_sensitive_data(
handler: Callable[[dict, LambdaContext], dict],
event: dict,
context: LambdaContext,
fields: List,
) -> dict:
# extracting payload from a EventBridge event
detail: dict = query(data=event, envelope=envelopes.EVENTBRIDGE)
guest_data: Any = detail.get("guest")
# Obfuscate fields (email, vat, passport) before calling Lambda handler
for guest_field in fields:
if guest_data.get(guest_field):
event["detail"]["guest"][guest_field] = obfuscate_data(str(guest_data.get(guest_field)))
return handler(event, context)
def obfuscate_data(value: str) -> bytes:
# base64 is not effective for obfuscation, this is an example
return base64.b64encode(value.encode("ascii"))
@obfuscate_sensitive_data(fields=["email", "passport", "vat"])
def lambda_handler(event: dict, context: LambdaContext) -> dict:
try:
booking_payload: dict = query(data=event, envelope=envelopes.EVENTBRIDGE)
return {
"book": Booking(**booking_payload).__dict__,
"message": "booking created",
"success": True,
}
except Exception as e:
raise BookingError("Unable to create booking") from e
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
{
"version": "0",
"id": "9c95e8e4-96a4-ef3f-b739-b6aa5b193afb",
"detail-type": "BookingCreated",
"source": "app.booking",
"account": "0123456789012",
"time": "2022-08-08T20:41:53Z",
"region": "eu-east-1",
"detail": {
"days": 5,
"date_from": "2020-08-08",
"date_to": "2020-08-13",
"hotel_id": "1",
"country": "Portugal",
"city": "Lisbon",
"guest": {
"name": "Lambda",
"email": "lambda@powertool.tools",
"passport": "AA123456",
"vat": "123456789"
}
}
}
Environment variables¶
The following environment variable is available to configure the middleware factory at a global scope:
Setting Description Environment variable Default Middleware Trace Creates sub-segment for each custom middleware.POWERTOOLS_TRACE_MIDDLEWARES
false
You can also use POWERTOOLS_TRACE_MIDDLEWARES
on a per-method basis, which will consequently override the environment variable value.
For advanced use cases, you can instantiate Tracer inside your middleware, and add annotations as well as metadata for additional operational insights.
advanced_middleware_tracer_function.pyadvanced_middleware_tracer_payload.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
import time
from typing import Callable
import requests
from requests import Response
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.event_handler import APIGatewayRestResolver
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
from aws_lambda_powertools.utilities.typing import LambdaContext
tracer = Tracer()
app = APIGatewayRestResolver()
@lambda_handler_decorator(trace_execution=True)
def middleware_with_advanced_tracing(
handler: Callable[[dict, LambdaContext], dict],
event: dict,
context: LambdaContext,
) -> dict:
tracer.put_metadata(key="resource", value=event.get("resource"))
start_time = time.time()
response = handler(event, context)
execution_time = time.time() - start_time
tracer.put_annotation(key="TotalExecutionTime", value=str(execution_time))
# adding custom headers in response object after lambda executing
response["headers"]["execution_time"] = execution_time
response["headers"]["aws_request_id"] = context.aws_request_id
return response
@app.get("/products")
def create_product() -> dict:
product: Response = requests.get("https://dummyjson.com/products/1")
product.raise_for_status()
return {"product": product.json()}
@middleware_with_advanced_tracing
def lambda_handler(event: dict, context: LambdaContext) -> dict:
return app.resolve(event, context)
{
"resource": "/products",
"path": "/products",
"httpMethod": "GET"
}
Tracing middleware execution¶
If you are making use of Tracer, you can trace the execution of your middleware to ease operations.
This makes use of an existing Tracer instance that you may have initialized anywhere in your code.
WarningYou must enable Active Tracing in your Lambda function when using this feature, otherwise Lambda cannot send traces to XRay.
getting_started_middleware_tracer_function.pygetting_started_middleware_tracer_payload.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
import time
from typing import Callable
import requests
from requests import Response
from aws_lambda_powertools.event_handler import APIGatewayRestResolver
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
from aws_lambda_powertools.utilities.typing import LambdaContext
app = APIGatewayRestResolver()
@lambda_handler_decorator(trace_execution=True)
def middleware_with_tracing(
handler: Callable[[dict, LambdaContext], dict],
event: dict,
context: LambdaContext,
) -> dict:
start_time = time.time()
response = handler(event, context)
execution_time = time.time() - start_time
# adding custom headers in response object after lambda executing
response["headers"]["execution_time"] = execution_time
response["headers"]["aws_request_id"] = context.aws_request_id
return response
@app.get("/products")
def create_product() -> dict:
product: Response = requests.get("https://dummyjson.com/products/1")
product.raise_for_status()
return {"product": product.json()}
@middleware_with_tracing
def lambda_handler(event: dict, context: LambdaContext) -> dict:
return app.resolve(event, context)
{
"resource": "/products",
"path": "/products",
"httpMethod": "GET"
}
When executed, your middleware name will appear in AWS X-Ray Trace details as ## middleware_name
, in this example the middleware name is ## middleware_with_tracing
.
You can create your own middleware and combine many features of Powertools for AWS Lambda (Python) such as trace, logs, feature flags, validation, jmespath_functions and others to abstract non-functional code.
In the example below, we create a Middleware with the following features:
combining_powertools_utilities_function.pycombining_powertools_utilities_schema.pycombining_powertools_utilities_event.jsonSAM TEMPLATE
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
import json
from typing import Callable
from urllib.parse import quote
import boto3
import combining_powertools_utilities_schema as schemas
import requests
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.event_handler import APIGatewayRestResolver
from aws_lambda_powertools.event_handler.exceptions import InternalServerError
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
from aws_lambda_powertools.utilities.feature_flags import AppConfigStore, FeatureFlags
from aws_lambda_powertools.utilities.feature_flags.types import JSONType
from aws_lambda_powertools.utilities.jmespath_utils import query
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.utilities.validation import SchemaValidationError, validate
app = APIGatewayRestResolver()
tracer = Tracer()
logger = Logger()
table_historic = boto3.resource("dynamodb").Table("HistoricTable")
app_config = AppConfigStore(environment="dev", application="comments", name="features")
feature_flags = FeatureFlags(store=app_config)
@lambda_handler_decorator(trace_execution=True)
def middleware_custom(
handler: Callable[[dict, LambdaContext], dict],
event: dict,
context: LambdaContext,
) -> dict:
# validating the INPUT with the given schema
# X-Customer-Id header must be informed in all requests
try:
validate(event=event, schema=schemas.INPUT)
except SchemaValidationError as e:
return {
"statusCode": 400,
"body": json.dumps(str(e)),
}
# extracting headers and requestContext from event
headers = query(data=event, envelope="headers")
request_context = query(data=event, envelope="requestContext")
logger.debug(f"X-Customer-Id => {headers.get('X-Customer-Id')}")
tracer.put_annotation(key="CustomerId", value=headers.get("X-Customer-Id"))
response = handler(event, context)
# automatically adding security headers to all responses
# see: https://securityheaders.com/
logger.info("Injecting security headers")
response["headers"]["Referrer-Policy"] = "no-referrer"
response["headers"]["Strict-Transport-Security"] = "max-age=15552000; includeSubDomains; preload"
response["headers"]["X-DNS-Prefetch-Control"] = "off"
response["headers"]["X-Content-Type-Options"] = "nosniff"
response["headers"]["X-Permitted-Cross-Domain-Policies"] = "none"
response["headers"]["X-Download-Options"] = "noopen"
logger.info("Saving api call in history table")
save_api_execution_history(str(event.get("path")), headers, request_context)
# return lambda execution
return response
@tracer.capture_method
def save_api_execution_history(path: str, headers: dict, request_context: dict) -> None:
try:
# using the feature flags utility to check if the new feature "save api call to history" is enabled by default
# see: https://docs.powertools.aws.dev/lambda/python/latest/utilities/feature_flags/#static-flags
save_history: JSONType = feature_flags.evaluate(name="save_history", default=False)
if save_history:
# saving history in dynamodb table
tracer.put_metadata(key="execution detail", value=request_context)
table_historic.put_item(
Item={
"customer_id": headers.get("X-Customer-Id"),
"request_id": request_context.get("requestId"),
"path": path,
"request_time": request_context.get("requestTime"),
"source_ip": request_context.get("identity", {}).get("sourceIp"),
"http_method": request_context.get("httpMethod"),
},
)
return None
except Exception:
# you can add more logic here to handle exceptions or even save this to a DLQ
# but not to make this example too long, we just return None since the Lambda has been successfully executed
return None
@app.get("/comments")
@tracer.capture_method
def get_comments():
try:
comments: requests.Response = requests.get("https://jsonplaceholder.typicode.com/comments")
comments.raise_for_status()
return {"comments": comments.json()[:10]}
except Exception as exc:
raise InternalServerError(str(exc)) from exc
@app.get("/comments/<comment_id>")
@tracer.capture_method
def get_comments_by_id(comment_id: str):
try:
comment_id = quote(comment_id, safe="")
comments: requests.Response = requests.get(f"https://jsonplaceholder.typicode.com/comments/{comment_id}")
comments.raise_for_status()
return {"comments": comments.json()}
except Exception as exc:
raise InternalServerError(str(exc)) from exc
@middleware_custom
def lambda_handler(event: dict, context: LambdaContext) -> dict:
return app.resolve(event, context)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
INPUT = {
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://example.com/object1661012141.json",
"title": "Root",
"type": "object",
"required": ["headers"],
"properties": {
"headers": {
"$id": "#root/headers",
"title": "Headers",
"type": "object",
"required": ["X-Customer-Id"],
"properties": {
"X-Customer-Id": {
"$id": "#root/headers/X-Customer-Id",
"title": "X-customer-id",
"type": "string",
"default": "",
"examples": ["1"],
"pattern": "^.*$",
},
},
},
},
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
{
"body":"None",
"headers":{
"Accept":"*/*",
"Accept-Encoding":"gzip, deflate, br",
"Connection":"keep-alive",
"Host":"127.0.0.1:3001",
"Postman-Token":"a9d49365-ebe1-4bb0-8627-d5e37cdce86d",
"User-Agent":"PostmanRuntime/7.29.0",
"X-Customer-Id":"1",
"X-Forwarded-Port":"3001",
"X-Forwarded-Proto":"http"
},
"httpMethod":"GET",
"isBase64Encoded":false,
"multiValueHeaders":{
"Accept":[
"*/*"
],
"Accept-Encoding":[
"gzip, deflate, br"
],
"Connection":[
"keep-alive"
],
"Host":[
"127.0.0.1:3001"
],
"Postman-Token":[
"a9d49365-ebe1-4bb0-8627-d5e37cdce86d"
],
"User-Agent":[
"PostmanRuntime/7.29.0"
],
"X-Customer-Id":[
"1"
],
"X-Forwarded-Port":[
"3001"
],
"X-Forwarded-Proto":[
"http"
]
},
"multiValueQueryStringParameters":"None",
"path":"/comments",
"pathParameters":"None",
"queryStringParameters":"None",
"requestContext":{
"accountId":"123456789012",
"apiId":"1234567890",
"domainName":"127.0.0.1:3001",
"extendedRequestId":"None",
"httpMethod":"GET",
"identity":{
"accountId":"None",
"apiKey":"None",
"caller":"None",
"cognitoAuthenticationProvider":"None",
"cognitoAuthenticationType":"None",
"cognitoIdentityPoolId":"None",
"sourceIp":"127.0.0.1",
"user":"None",
"userAgent":"Custom User Agent String",
"userArn":"None"
},
"path":"/comments",
"protocol":"HTTP/1.1",
"requestId":"56d1a102-6d9d-4f13-b4f7-26751c10a131",
"requestTime":"20/Aug/2022:18:18:58 +0000",
"requestTimeEpoch":1661019538,
"resourceId":"123456",
"resourcePath":"/comments",
"stage":"Prod"
},
"resource":"/comments",
"stageVariables":"None",
"version":"1.0"
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Middleware-powertools-utilities example
Globals:
Function:
Timeout: 5
Runtime: python3.12
Tracing: Active
Architectures:
- x86_64
Environment:
Variables:
POWERTOOLS_LOG_LEVEL: DEBUG
POWERTOOLS_LOGGER_SAMPLE_RATE: 0.1
POWERTOOLS_LOGGER_LOG_EVENT: true
POWERTOOLS_SERVICE_NAME: middleware
Resources:
MiddlewareFunction:
Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
Properties:
CodeUri: middleware/
Handler: app.lambda_handler
Description: Middleware function
Policies:
- AWSLambdaBasicExecutionRole # Managed Policy
- Version: '2012-10-17' # Policy Document
Statement:
- Effect: Allow
Action:
- dynamodb:PutItem
Resource: !GetAtt HistoryTable.Arn
- Effect: Allow
Action: # https://docs.aws.amazon.com/appconfig/latest/userguide/getting-started-with-appconfig-permissions.html
- ssm:GetDocument
- ssm:ListDocuments
- appconfig:GetLatestConfiguration
- appconfig:StartConfigurationSession
- appconfig:ListApplications
- appconfig:GetApplication
- appconfig:ListEnvironments
- appconfig:GetEnvironment
- appconfig:ListConfigurationProfiles
- appconfig:GetConfigurationProfile
- appconfig:ListDeploymentStrategies
- appconfig:GetDeploymentStrategy
- appconfig:GetConfiguration
- appconfig:ListDeployments
- appconfig:GetDeployment
Resource: "*"
Events:
GetComments:
Type: Api
Properties:
Path: /comments
Method: GET
GetCommentsById:
Type: Api
Properties:
Path: /comments/{comment_id}
Method: GET
# DynamoDB table to store historical data
HistoryTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: "HistoryTable"
AttributeDefinitions:
- AttributeName: customer_id
AttributeType: S
- AttributeName: request_id
AttributeType: S
KeySchema:
- AttributeName: customer_id
KeyType: HASH
- AttributeName: request_id
KeyType: "RANGE"
BillingMode: PAY_PER_REQUEST
# Feature flags using AppConfig
FeatureCommentApp:
Type: AWS::AppConfig::Application
Properties:
Description: "Comments Application for feature toggles"
Name: comments
FeatureCommentDevEnv:
Type: AWS::AppConfig::Environment
Properties:
ApplicationId: !Ref FeatureCommentApp
Description: "Development Environment for the App Config Comments"
Name: dev
FeatureCommentConfigProfile:
Type: AWS::AppConfig::ConfigurationProfile
Properties:
ApplicationId: !Ref FeatureCommentApp
Name: features
LocationUri: "hosted"
HostedConfigVersion:
Type: AWS::AppConfig::HostedConfigurationVersion
Properties:
ApplicationId: !Ref FeatureCommentApp
ConfigurationProfileId: !Ref FeatureCommentConfigProfile
Description: 'A sample hosted configuration version'
Content: |
{
"save_history": {
"default": true
}
}
ContentType: 'application/json'
# this is just an example
# change this values according your deployment strategy
BasicDeploymentStrategy:
Type: AWS::AppConfig::DeploymentStrategy
Properties:
Name: "Deployment"
Description: "Deployment strategy for comments app."
DeploymentDurationInMinutes: 1
FinalBakeTimeInMinutes: 1
GrowthFactor: 100
GrowthType: LINEAR
ReplicateTo: NONE
ConfigDeployment:
Type: AWS::AppConfig::Deployment
Properties:
ApplicationId: !Ref FeatureCommentApp
ConfigurationProfileId: !Ref FeatureCommentConfigProfile
ConfigurationVersion: !Ref HostedConfigVersion
DeploymentStrategyId: !Ref BasicDeploymentStrategy
EnvironmentId: !Ref FeatureCommentDevEnv
Tips¶
trace_execution
to quickly understand the performance impact of your middlewares, and reduce or merge tasks when necessaryRetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4