Tracer is an opinionated thin wrapper for AWS X-Ray Python SDK.
Key features¶All examples shared in this documentation are available within the project repository.
Tracer relies on AWS X-Ray SDK over OpenTelememetry Distro (ADOT) for optimal cold start (lower latency).
Install¶This is not necessary if you're installing Powertools for AWS Lambda (Python) via Lambda Layer/SAR
Add aws-lambda-powertools[tracer]
as a dependency in your preferred tool: e.g., requirements.txt, pyproject.toml. This will ensure you have the required dependencies before using Tracer.
Before your use this utility, your AWS Lambda function must have permissions to send traces to AWS X-Ray.
AWS Serverless Application Model (SAM) example1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: Powertools for AWS Lambda (Python) version
Globals:
Function:
Timeout: 5
Runtime: python3.12
Tracing: Active
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: payment
Layers:
# Find the latest Layer version in the official documentation
# https://docs.powertools.aws.dev/lambda/python/latest/#lambda-layer
- !Sub arn:aws:lambda:${AWS::Region}:017000801446:layer:AWSLambdaPowertoolsPythonV3-python312-x86_64:22
Resources:
CaptureLambdaHandlerExample:
Type: AWS::Serverless::Function
Properties:
CodeUri: ../src
Handler: capture_lambda_handler.handler
Lambda handler¶
You can quickly start by initializing Tracer
and use capture_lambda_handler
decorator for your Lambda handler.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
tracer = Tracer() # Sets service via POWERTOOLS_SERVICE_NAME env var
# OR tracer = Tracer(service="example")
def collect_payment(charge_id: str) -> str:
return f"dummy payment collected for charge: {charge_id}"
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> str:
charge_id = event.get("charge_id", "")
return collect_payment(charge_id=charge_id)
capture_lambda_handler
performs these additional tasks to ease operations:
ColdStart
annotation to easily filter traces that have had an initialization overheadService
annotation if service
parameter or POWERTOOLS_SERVICE_NAME
is setAnnotations are key-values associated with traces and indexed by AWS X-Ray. You can use them to filter traces and to create Trace Groups to slice and dice your transactions.
Adding annotations with put_annotation method1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
tracer = Tracer()
def collect_payment(charge_id: str) -> str:
tracer.put_annotation(key="PaymentId", value=charge_id)
return f"dummy payment collected for charge: {charge_id}"
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> str:
charge_id = event.get("charge_id", "")
return collect_payment(charge_id=charge_id)
Metadata are key-values also associated with traces but not indexed by AWS X-Ray. You can use them to add additional context for an operation using any native object.
Adding arbitrary metadata with put_metadata method1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
tracer = Tracer()
def collect_payment(charge_id: str) -> str:
return f"dummy payment collected for charge: {charge_id}"
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> str:
payment_context = {
"charge_id": event.get("charge_id", ""),
"merchant_id": event.get("merchant_id", ""),
"request_id": context.aws_request_id,
}
payment_context["receipt_id"] = collect_payment(charge_id=payment_context["charge_id"])
tracer.put_metadata(key="payment_response", value=payment_context)
return payment_context["receipt_id"]
Synchronous functions¶
You can trace synchronous functions using the capture_method
decorator.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
tracer = Tracer()
@tracer.capture_method
def collect_payment(charge_id: str) -> str:
tracer.put_annotation(key="PaymentId", value=charge_id)
return f"dummy payment collected for charge: {charge_id}"
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> str:
charge_id = event.get("charge_id", "")
return collect_payment(charge_id=charge_id)
Note: Function responses are auto-captured and stored as JSON, by default.
Use capture_response parameter to override this behaviour.
The serialization is performed by aws-xray-sdk via jsonpickle
module. This can cause side effects for file-like objects like boto S3 StreamingBody
, where its response will be read only once during serialization.
We do not support asynchronous Lambda handler
You can trace asynchronous functions and generator functions (including context managers) using capture_method
.
capture_method_async.pycapture_method_context_manager.pycapture_method_generators.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
import asyncio
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
tracer = Tracer()
@tracer.capture_method
async def collect_payment(charge_id: str) -> str:
tracer.put_annotation(key="PaymentId", value=charge_id)
await asyncio.sleep(0.5)
return f"dummy payment collected for charge: {charge_id}"
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> str:
charge_id = event.get("charge_id", "")
return asyncio.run(collect_payment(charge_id=charge_id))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
import contextlib
from collections.abc import Generator
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
tracer = Tracer()
logger = Logger()
@contextlib.contextmanager
@tracer.capture_method
def collect_payment(charge_id: str) -> Generator[str, None, None]:
try:
yield f"dummy payment collected for charge: {charge_id}"
finally:
tracer.put_annotation(key="PaymentId", value=charge_id)
@tracer.capture_lambda_handler
@logger.inject_lambda_context
def lambda_handler(event: dict, context: LambdaContext) -> str:
charge_id = event.get("charge_id", "")
with collect_payment(charge_id=charge_id) as receipt_id:
logger.info(f"Processing payment collection for charge {charge_id} with receipt {receipt_id}")
return receipt_id
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
from collections.abc import Generator
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
tracer = Tracer()
@tracer.capture_method
def collect_payment(charge_id: str) -> Generator[str, None, None]:
yield f"dummy payment collected for charge: {charge_id}"
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> str:
charge_id = event.get("charge_id", "")
return next(collect_payment(charge_id=charge_id))
Environment variables¶
The following environment variables are available to configure Tracer at a global scope:
Setting Description Environment variable Default Disable Tracing Explicitly disables all tracing.POWERTOOLS_TRACE_DISABLED
false
Response Capture Captures Lambda or method return as metadata. POWERTOOLS_TRACER_CAPTURE_RESPONSE
true
Exception Capture Captures Lambda or method exception as metadata. POWERTOOLS_TRACER_CAPTURE_ERROR
true
Both POWERTOOLS_TRACER_CAPTURE_RESPONSE
and POWERTOOLS_TRACER_CAPTURE_ERROR
can be set on a per-method basis, consequently overriding the environment variable value.
Tracer automatically patches all supported libraries by X-Ray during initialization, by default. Underneath, AWS X-Ray SDK checks whether a supported library has been imported before patching.
If you're looking to shave a few microseconds, or milliseconds depending on your function memory configuration, you can patch specific modules using patch_modules
param:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
import requests
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
MODULES = ["requests"]
tracer = Tracer(patch_modules=MODULES)
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> str:
ret = requests.get("https://httpbin.org/get")
ret.raise_for_status()
return ret.json()
Disabling response auto-capture¶
Use capture_response=False
parameter in both capture_lambda_handler
and capture_method
decorators to instruct Tracer not to serialize function responses as metadata.
message too long
errordisable_capture_response.pydisable_capture_response_streaming_body.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
tracer = Tracer()
logger = Logger()
@tracer.capture_method(capture_response=False)
def collect_payment(charge_id: str) -> str:
tracer.put_annotation(key="PaymentId", value=charge_id)
logger.debug("Returning sensitive information....")
return f"dummy payment collected for charge: {charge_id}"
@tracer.capture_lambda_handler(capture_response=False)
def lambda_handler(event: dict, context: LambdaContext) -> str:
charge_id = event.get("charge_id", "")
return collect_payment(charge_id=charge_id)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
import os
import boto3
from botocore.response import StreamingBody
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
BUCKET = os.getenv("BUCKET_NAME", "")
REPORT_KEY = os.getenv("REPORT_KEY", "")
tracer = Tracer()
logger = Logger()
session = boto3.session.Session()
s3 = session.client("s3")
@tracer.capture_method(capture_response=False)
def fetch_payment_report(payment_id: str) -> StreamingBody:
ret = s3.get_object(Bucket=BUCKET, Key=f"{REPORT_KEY}/{payment_id}")
logger.debug("Returning streaming body from S3 object....")
return ret["Body"]
@tracer.capture_lambda_handler(capture_response=False)
def lambda_handler(event: dict, context: LambdaContext) -> str:
payment_id = event.get("payment_id", "")
report = fetch_payment_report(payment_id=payment_id)
return report.read().decode()
Disabling exception auto-capture¶
Use capture_error=False
parameter in both capture_lambda_handler
and capture_method
decorators to instruct Tracer not to serialize exceptions as metadata.
Useful when returning sensitive information in exceptions/stack traces you don't control
Disabling exception auto-capture for tracing metadata1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
import os
import requests
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
tracer = Tracer()
ENDPOINT = os.getenv("PAYMENT_API", "")
class PaymentError(Exception): ...
@tracer.capture_method(capture_error=False)
def collect_payment(charge_id: str) -> dict:
try:
ret = requests.post(url=f"{ENDPOINT}/collect", data={"charge_id": charge_id})
ret.raise_for_status()
return ret.json()
except requests.HTTPError as e:
raise PaymentError(f"Unable to collect payment for charge {charge_id}") from e
@tracer.capture_lambda_handler(capture_error=False)
def lambda_handler(event: dict, context: LambdaContext) -> str:
charge_id = event.get("charge_id", "")
ret = collect_payment(charge_id=charge_id)
return ret.get("receipt_id", "")
Ignoring certain HTTP endpoints¶
You might have endpoints you don't want requests to be traced, perhaps due to the volume of calls or sensitive URLs.
You can use ignore_endpoint
method with the hostname and/or URLs you'd like it to be ignored - globs (*
) are allowed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
import os
import requests
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
ENDPOINT = os.getenv("PAYMENT_API", "")
IGNORE_URLS = ["/collect", "/refund"]
tracer = Tracer()
tracer.ignore_endpoint(hostname=ENDPOINT, urls=IGNORE_URLS)
tracer.ignore_endpoint(hostname=f"*.{ENDPOINT}", urls=IGNORE_URLS) # `<stage>.ENDPOINT`
class PaymentError(Exception): ...
@tracer.capture_method(capture_error=False)
def collect_payment(charge_id: str) -> dict:
try:
ret = requests.post(url=f"{ENDPOINT}/collect", data={"charge_id": charge_id})
ret.raise_for_status()
return ret.json()
except requests.HTTPError as e:
raise PaymentError(f"Unable to collect payment for charge {charge_id}") from e
@tracer.capture_lambda_handler(capture_error=False)
def lambda_handler(event: dict, context: LambdaContext) -> str:
charge_id = event.get("charge_id", "")
ret = collect_payment(charge_id=charge_id)
return ret.get("receipt_id", "")
Tracing aiohttp requests¶ Info
This snippet assumes you have aiohttp as a dependency
You can use aiohttp_trace_config
function to create a valid aiohttp trace_config object. This is necessary since X-Ray utilizes aiohttp trace hooks to capture requests end-to-end.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
import asyncio
import os
import aiohttp
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.tracing import aiohttp_trace_config
from aws_lambda_powertools.utilities.typing import LambdaContext
ENDPOINT = os.getenv("PAYMENT_API", "")
tracer = Tracer()
@tracer.capture_method
async def collect_payment(charge_id: str) -> dict:
async with aiohttp.ClientSession(trace_configs=[aiohttp_trace_config()]) as session:
async with session.get(f"{ENDPOINT}/collect") as resp:
return await resp.json()
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> dict:
charge_id = event.get("charge_id", "")
return asyncio.run(collect_payment(charge_id=charge_id))
Escape hatch mechanism¶
You can use tracer.provider
attribute to access all methods provided by AWS X-Ray xray_recorder
object.
This is useful when you need a feature available in X-Ray that is not available in the Tracer utility, for example thread-safe, or context managers.
Tracing a code block with in_subsegment escape hatch1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
tracer = Tracer()
def collect_payment(charge_id: str) -> str:
return f"dummy payment collected for charge: {charge_id}"
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> str:
charge_id = event.get("charge_id", "")
with tracer.provider.in_subsegment("## collect_payment") as subsegment:
subsegment.put_annotation(key="PaymentId", value=charge_id)
ret = collect_payment(charge_id=charge_id)
subsegment.put_metadata(key="payment_response", value=ret)
return ret
Concurrent asynchronous functions¶ Warning
X-Ray SDK will raise an exception when async functions are run and traced concurrently
A safe workaround mechanism is to use in_subsegment_async
available via Tracer escape hatch (tracer.provider
).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
import asyncio
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
tracer = Tracer()
async def another_async_task():
async with tracer.provider.in_subsegment_async("## another_async_task") as subsegment:
subsegment.put_annotation(key="key", value="value")
subsegment.put_metadata(key="key", value="value", namespace="namespace")
...
async def another_async_task_2():
async with tracer.provider.in_subsegment_async("## another_async_task_2") as subsegment:
subsegment.put_annotation(key="key", value="value")
subsegment.put_metadata(key="key", value="value", namespace="namespace")
...
async def collect_payment(charge_id: str) -> str:
await asyncio.gather(another_async_task(), another_async_task_2())
return f"dummy payment collected for charge: {charge_id}"
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> str:
charge_id = event.get("charge_id", "")
return asyncio.run(collect_payment(charge_id=charge_id))
Reusing Tracer across your code¶
Tracer keeps a copy of its configuration after the first initialization. This is useful for scenarios where you want to use Tracer in more than one location across your code base.
Warning: Import order matters when using Lambda Layers or multiple modulesDo not set auto_patch=False
when reusing Tracer in Lambda Layers, or in multiple modules.
This can result in the first Tracer config being inherited by new instances, and their modules not being patched.
Tracer will automatically ignore imported modules that have been patched.
tracer_reuse.pytracer_reuse_module.py
1 2 3 4 5 6 7 8 9 10 11 12
from tracer_reuse_module import collect_payment
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
tracer = Tracer()
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> str:
charge_id = event.get("charge_id", "")
return collect_payment(charge_id=charge_id)
A new instance of Tracer will be created but will reuse the previous Tracer instance configuration, similar to a Singleton.
from aws_lambda_powertools import Tracer
tracer = Tracer()
@tracer.capture_method
def collect_payment(charge_id: str) -> str:
return f"dummy payment collected for charge: {charge_id}"
Testing your code¶
Tracer is disabled by default when not running in the AWS Lambda environment, including AWS SAM CLI and Chalice environments. This means no code changes or environment variables to be set.
Tips¶RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4