These examples assume you have already set up tracing in your application.
This guide provides practical examples of using span attributes and metrics to solve common monitoring and debugging challenges in Python applications. Each example demonstrates how to instrument different components, showing how they work together within a distributed trace to provide end-to-end visibility.
File Upload and Processing PipelineChallenge: Understanding bottlenecks and failures in multi-step file processing operations across request handling and processing services.
Solution: Track the entire file processing pipeline with detailed metrics at each stage, from initial file handling through processing and storage.
Client Application Instrumentation:
Copied
import sentry_sdk
import time
from pathlib import Path
import requests
from contextlib import contextmanager
with sentry_sdk.start_span(op="upload", name="Upload File") as span:
try:
file_path = Path("/path/to/uploads/user-profile.jpg")
@contextmanager
def track_progress(total_size):
start = time.time()
bytes_sent = 0
class ProgressTracker:
def __call__(self, monitor):
nonlocal bytes_sent
bytes_sent = monitor.bytes_read
progress_percent = (bytes_sent / total_size) * 100
span.set_data("upload.percent_complete", progress_percent)
span.set_data("upload.bytes_transferred", bytes_sent)
yield ProgressTracker()
span.set_data("upload.total_time_ms", (time.time() - start) * 1000)
with track_progress(file_path.stat().st_size) as progress_callback:
with open(file_path, "rb") as f:
response = requests.post(
"https://api.example.com/upload",
files={"file": f},
headers={"X-Sentry-Trace": sentry_sdk.get_traceparent()},
stream=True,
hooks={"response": progress_callback}
)
span.set_data("upload.success", response.ok)
if response.ok:
result = response.json()
span.set_data("upload.server_file_id", result["file_id"])
return response
except Exception as error:
span.set_data("upload.success", False)
span.set_data("upload.error_type", error.__class__.__name__)
span.set_data("upload.error_message", str(error))
span.set_status(sentry_sdk.SpanStatus.ERROR)
raise
Server Application Instrumentation:
Copied
import sentry_sdk
from pathlib import Path
import boto3
with sentry_sdk.start_span(
op="file.process.service",
name="File Processing Service"
) as span:
file_path = Path("/tmp/uploads/user-profile.jpg")
try:
with sentry_sdk.start_span(op="scan", name="Virus Scan") as scan_span:
scan_span.set_data("scan.engine", "clamav")
scan_span.set_data("scan.result", "clean")
s3_client = boto3.client('s3', region_name='us-west-2')
upload_start = time.time()
s3_client.upload_file(
str(file_path),
'my-bucket',
'uploads/user-profile.jpg'
)
span.set_data("storage.actual_upload_time_ms",
(time.time() - upload_start) * 1000)
except Exception as e:
span.set_status(sentry_sdk.SpanStatus.ERROR)
span.set_data("error.message", str(e))
raise
How the Trace Works Together: The client application span initiates the trace and handles the file upload. It propagates the trace context to the server through the request headers. The server span continues the trace, processing the file and storing it. This creates a complete picture of the file's journey, allowing you to:
Challenge: Managing cost (token usage) and performance of LLM integrations in Python applications.
Solution: Tracking of the entire LLM interaction flow, from initial request through response processing.
Client Application Instrumentation:
Copied
import sentry_sdk
import time
import openai
from flask import jsonify
@app.route("/ask", methods=["POST"])
def handle_llm_request():
with sentry_sdk.start_span(op="gen_ai.generate_text", name="Generate Text") as span:
start_time = time.time() * 1000
user_input = request.json["question"]
response_chunks = []
first_token_received = False
tokens_received = 0
try:
for chunk in openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": user_input}],
stream=True
):
tokens_received += 1
content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
response_chunks.append(content)
if not first_token_received and content:
first_token_received = True
time_to_first_token = (time.time() * 1000) - start_time
span.set_data("response.time_to_first_token_ms", time_to_first_token)
total_request_time = (time.time() * 1000) - start_time
span.set_data("response.total_time_ms", total_request_time)
span.set_data("response.format", "text")
span.set_data("response.tokens_received", tokens_received)
return jsonify({
"response": "".join(response_chunks),
"tokens": tokens_received
})
except Exception as error:
span.set_status(sentry_sdk.SpanStatus.ERROR)
span.set_data("error.type", error.__class__.__name__)
span.set_data("error.message", str(error))
return jsonify({"error": str(error)}), 500
Server Application Instrumentation:
Copied
import sentry_sdk
import time
import openai
def process_llm_request(request_data):
with sentry_sdk.start_span(
op="gen_ai.generate_text",
name="Generate Text"
) as span:
start_time = int(time.time() * 1000)
try:
rate_limits = check_rate_limits()
span.set_data("llm.rate_limit_remaining", rate_limits["remaining"])
prepared_prompt = enhance_prompt(request_data["question"])
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prepared_prompt}],
temperature=0.7,
max_tokens=4096
)
span.set_data("llm.prompt_tokens", response.usage.prompt_tokens)
span.set_data("llm.completion_tokens", response.usage.completion_tokens)
span.set_data("llm.total_tokens", response.usage.total_tokens)
span.set_data("llm.api_latency_ms", int(time.time() * 1000) - start_time)
cost = calculate_cost(
response.usage.prompt_tokens,
response.usage.completion_tokens,
"gpt-4"
)
span.set_data("llm.cost_usd", cost)
return {
"response": response.choices[0].message.content,
"usage": response.usage
}
except Exception as error:
span.set_data("error", True)
span.set_data("error.type", error.__class__.__name__)
span.set_data("error.message", str(error))
is_rate_limit = "rate_limit" in str(error).lower()
span.set_data("error.is_rate_limit", is_rate_limit)
span.set_status(sentry_sdk.SpanStatus.ERROR)
raise
How the Trace Works Together: The client application span captures the initial request handling, while the server span tracks the actual LLM API interaction. The distributed trace shows the complete flow from input to response, enabling you to:
Challenge: Understanding the complete purchase flow and identifying revenue-impacting issues across the application stack.
Solution: Track the full transaction process from API request to order fulfillment.
Client Application Instrumentation:
Copied
import sentry_sdk
import time
from django.views import View
from django.http import JsonResponse
class CheckoutView(View):
def post(self, request):
with sentry_sdk.start_span(op="order", name="Process Order") as span:
validation_start = time.time()
validation_result = self.validate_checkout_data(request.POST)
span.set_data("request.validation_time_ms",
(time.time() - validation_start) * 1000)
if not validation_result["valid"]:
span.set_data("request.validation_success", False)
span.set_data("request.validation_errors", validation_result["errors"])
return JsonResponse({"errors": validation_result["errors"]}, status=400)
try:
order_result = self.process_order(request)
span.set_data("order.id", order_result["order_id"])
span.set_data("order.success", True)
request.session["cart"] = []
request.session["cart_total"] = 0
return JsonResponse({"order_id": order_result["order_id"]})
except Exception as e:
span.set_status(sentry_sdk.SpanStatus.ERROR)
span.set_data("order.success", False)
span.set_data("error.message", str(e))
return JsonResponse({"error": str(e)}, status=500)
Server Application Instrumentation:
Copied
import sentry_sdk
import stripe
from decimal import Decimal
def process_order(order_data):
with sentry_sdk.start_span(
op="inventory",
name="Check Inventory"
) as span:
try:
inventory_start = time.time()
inventory_result = check_inventory(order_data["items"])
span.set_data("inventory.check_time_ms",
(time.time() - inventory_start) * 1000)
span.set_data("inventory.all_available", inventory_result["all_available"])
if not inventory_result["all_available"]:
span.set_data("inventory.unavailable_items",
inventory_result["unavailable_items"])
raise ValueError("Some items are out of stock")
payment_start = time.time()
stripe.api_key = "sk_test_..."
payment_intent = stripe.PaymentIntent.create(
amount=int(Decimal(order_data["total"]) * 100),
currency="usd",
payment_method=order_data["payment_method_id"],
confirm=True
)
span.set_data("payment.processing_time_ms",
(time.time() - payment_start) * 1000)
span.set_data("payment.transaction_id", payment_intent.id)
span.set_data("payment.success", payment_intent.status == "succeeded")
fulfillment = create_fulfillment(order_data["order_id"])
span.set_data("fulfillment.id", fulfillment["id"])
span.set_data("fulfillment.warehouse", fulfillment["warehouse"])
span.set_data("fulfillment.shipping_method", fulfillment["shipping_method"])
span.set_data("fulfillment.estimated_delivery",
fulfillment["estimated_delivery"].isoformat())
return {
"success": True,
"order_id": order_data["order_id"],
"payment_id": payment_intent.id,
"fulfillment_id": fulfillment["id"]
}
except Exception as e:
span.set_status(sentry_sdk.SpanStatus.ERROR)
span.set_data("error.message", str(e))
span.set_data("order.success", False)
raise
How the Trace Works Together: The client application span tracks the initial order request, while the server span handles order processing and fulfillment. The distributed trace provides visibility into the entire purchase flow, allowing you to:
Challenge: Understanding performance and reliability of distributed data processing pipelines, from job submission through completion.
Solution: Comprehensive tracking of job lifecycle across queue management, processing stages, and worker performance.
Client Application Instrumentation:
Copied
import sentry_sdk
from celery import shared_task
import time
from datetime import datetime, timedelta
def submit_processing_job(data_file_path, priority="medium"):
with sentry_sdk.start_span(op="job", name="Process Job") as span:
try:
task = process_data_file.apply_async(
args=[str(data_file_path)],
kwargs={"priority": priority},
queue="data_processing"
)
span.set_data("job.id", task.id)
span.set_data("job.submission_success", True)
monitoring_result = setup_task_monitoring(task.id)
span.set_data("monitor.callback_url", monitoring_result["callback_url"])
return {
"job_id": task.id,
"status": "submitted",
"monitoring_url": monitoring_result["status_url"]
}
except Exception as e:
span.set_status(sentry_sdk.SpanStatus.ERROR)
span.set_data("job.submission_success", False)
span.set_data("error.message", str(e))
raise
@shared_task(name="tasks.process_data_file")
def process_data_file(file_path, priority="medium"):
with sentry_sdk.start_span(
op="process",
name="Process Data"
) as span:
try:
span.set_data("processing.current_stage", "parse")
data = parse_data_file(file_path)
span.set_data("processing.current_stage", "transform")
transformed_data = transform_data(data)
span.set_data("processing.current_stage", "validate")
validation_result = validate_data(transformed_data)
span.set_data("validation.errors_count", len(validation_result["errors"]))
span.set_data("processing.current_stage", "export")
export_result = export_processed_data(transformed_data)
span.set_data("resource.cpu_percent", psutil.cpu_percent())
span.set_data("resource.memory_used_mb",
psutil.Process().memory_info().rss / (1024 * 1024))
span.set_data("outcome.status", "completed")
span.set_data("outcome.records_processed", len(data))
span.set_data("outcome.output_size_bytes",
os.path.getsize(export_result["output_path"]))
return {
"success": True,
"records_processed": len(data),
"output_path": export_result["output_path"]
}
except Exception as e:
span.set_status(sentry_sdk.SpanStatus.ERROR)
span.set_data("outcome.status", "failed")
span.set_data("error.message", str(e))
span.set_data("error.stage", span.get_data("processing.current_stage"))
raise
How the Trace Works Together: This example shows both the job submission and processing as a single trace, which is common in Celery/distributed task patterns. The spans track the entire job lifecycle, enabling you to:
For more information about implementing these examples effectively, see our Span Metrics guide which includes detailed best practices and implementation guidelines.
Help improve this contentRetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4