A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://docs.sentry.io/platforms/python/tracing/span-metrics/examples/ below:

Example Instrumentation | Sentry for Python

Example Instrumentation Examples of using span metrics to debug performance issues and monitor application behavior across Python applications.

These examples assume you have already set up tracing in your application.

This guide provides practical examples of using span attributes and metrics to solve common monitoring and debugging challenges in Python applications. Each example demonstrates how to instrument different components, showing how they work together within a distributed trace to provide end-to-end visibility.

File Upload and Processing Pipeline

Challenge: Understanding bottlenecks and failures in multi-step file processing operations across request handling and processing services.

Solution: Track the entire file processing pipeline with detailed metrics at each stage, from initial file handling through processing and storage.

Client Application Instrumentation:

Copied


import sentry_sdk
import time
from pathlib import Path
import requests
from contextlib import contextmanager

with sentry_sdk.start_span(op="upload", name="Upload File") as span:
    try:
        
        file_path = Path("/path/to/uploads/user-profile.jpg")
        
        
        @contextmanager
        def track_progress(total_size):
            start = time.time()
            bytes_sent = 0
            
            class ProgressTracker:
                def __call__(self, monitor):
                    nonlocal bytes_sent
                    bytes_sent = monitor.bytes_read
                    progress_percent = (bytes_sent / total_size) * 100
                    span.set_data("upload.percent_complete", progress_percent)
                    span.set_data("upload.bytes_transferred", bytes_sent)
            
            yield ProgressTracker()
            
            
            span.set_data("upload.total_time_ms", (time.time() - start) * 1000)
        
        
        with track_progress(file_path.stat().st_size) as progress_callback:
            with open(file_path, "rb") as f:
                response = requests.post(
                    "https://api.example.com/upload",
                    files={"file": f},
                    headers={"X-Sentry-Trace": sentry_sdk.get_traceparent()},  
                    stream=True,
                    hooks={"response": progress_callback}
                )
            
            
            span.set_data("upload.success", response.ok)
            if response.ok:
                result = response.json()
                span.set_data("upload.server_file_id", result["file_id"])
            
            return response
        
    except Exception as error:
        
        span.set_data("upload.success", False)
        span.set_data("upload.error_type", error.__class__.__name__)
        span.set_data("upload.error_message", str(error))
        span.set_status(sentry_sdk.SpanStatus.ERROR)
        raise

Server Application Instrumentation:

Copied


import sentry_sdk
from pathlib import Path
import boto3

with sentry_sdk.start_span(
    op="file.process.service",
    name="File Processing Service"
) as span:
    
    file_path = Path("/tmp/uploads/user-profile.jpg")
    
    
    try:
        
        with sentry_sdk.start_span(op="scan", name="Virus Scan") as scan_span:
            
            scan_span.set_data("scan.engine", "clamav")
            scan_span.set_data("scan.result", "clean")
        
        
        s3_client = boto3.client('s3', region_name='us-west-2')
        upload_start = time.time()
        s3_client.upload_file(
            str(file_path),
            'my-bucket',
            'uploads/user-profile.jpg'
        )
        
        span.set_data("storage.actual_upload_time_ms", 
                         (time.time() - upload_start) * 1000)
    
    except Exception as e:
        span.set_status(sentry_sdk.SpanStatus.ERROR)
        span.set_data("error.message", str(e))
        raise

How the Trace Works Together: The client application span initiates the trace and handles the file upload. It propagates the trace context to the server through the request headers. The server span continues the trace, processing the file and storing it. This creates a complete picture of the file's journey, allowing you to:

LLM Integration Monitoring

Challenge: Managing cost (token usage) and performance of LLM integrations in Python applications.

Solution: Tracking of the entire LLM interaction flow, from initial request through response processing.

Client Application Instrumentation:

Copied


import sentry_sdk
import time
import openai
from flask import jsonify

@app.route("/ask", methods=["POST"])
def handle_llm_request():
    with sentry_sdk.start_span(op="gen_ai.generate_text", name="Generate Text") as span:
        start_time = time.time() * 1000  
        
        
        user_input = request.json["question"]
        
        response_chunks = []
        first_token_received = False
        tokens_received = 0
        
        
        try:
            for chunk in openai.ChatCompletion.create(
                model="gpt-4",
                messages=[{"role": "user", "content": user_input}],
                stream=True
            ):
                tokens_received += 1
                content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
                response_chunks.append(content)
                
                
                if not first_token_received and content:
                    first_token_received = True
                    time_to_first_token = (time.time() * 1000) - start_time
                    span.set_data("response.time_to_first_token_ms", time_to_first_token)
            
            
            total_request_time = (time.time() * 1000) - start_time
            
            span.set_data("response.total_time_ms", total_request_time)
            span.set_data("response.format", "text")
            span.set_data("response.tokens_received", tokens_received)
            
            return jsonify({
                "response": "".join(response_chunks),
                "tokens": tokens_received
            })
            
        except Exception as error:
            span.set_status(sentry_sdk.SpanStatus.ERROR)
            span.set_data("error.type", error.__class__.__name__)
            span.set_data("error.message", str(error))
            return jsonify({"error": str(error)}), 500

Server Application Instrumentation:

Copied


import sentry_sdk
import time
import openai

def process_llm_request(request_data):
    with sentry_sdk.start_span(
        op="gen_ai.generate_text",
        name="Generate Text"
    ) as span:
        start_time = int(time.time() * 1000)  
        
        try:
            
            rate_limits = check_rate_limits()
            span.set_data("llm.rate_limit_remaining", rate_limits["remaining"])
            
            
            prepared_prompt = enhance_prompt(request_data["question"])
            
            
            response = openai.ChatCompletion.create(
                model="gpt-4",
                messages=[{"role": "system", "content": "You are a helpful assistant."},
                         {"role": "user", "content": prepared_prompt}],
                temperature=0.7,
                max_tokens=4096
            )
            
            
            span.set_data("llm.prompt_tokens", response.usage.prompt_tokens)
            span.set_data("llm.completion_tokens", response.usage.completion_tokens)
            span.set_data("llm.total_tokens", response.usage.total_tokens)
            span.set_data("llm.api_latency_ms", int(time.time() * 1000) - start_time)
            
            
            cost = calculate_cost(
                response.usage.prompt_tokens,
                response.usage.completion_tokens,
                "gpt-4"
            )
            span.set_data("llm.cost_usd", cost)
            
            return {
                "response": response.choices[0].message.content,
                "usage": response.usage
            }
            
        except Exception as error:
            
            span.set_data("error", True)
            span.set_data("error.type", error.__class__.__name__)
            span.set_data("error.message", str(error))
            
            
            is_rate_limit = "rate_limit" in str(error).lower()
            span.set_data("error.is_rate_limit", is_rate_limit)
            span.set_status(sentry_sdk.SpanStatus.ERROR)
            
            raise

How the Trace Works Together: The client application span captures the initial request handling, while the server span tracks the actual LLM API interaction. The distributed trace shows the complete flow from input to response, enabling you to:

E-Commerce Transaction Flow

Challenge: Understanding the complete purchase flow and identifying revenue-impacting issues across the application stack.

Solution: Track the full transaction process from API request to order fulfillment.

Client Application Instrumentation:

Copied


import sentry_sdk
import time
from django.views import View
from django.http import JsonResponse

class CheckoutView(View):
    def post(self, request):
        with sentry_sdk.start_span(op="order", name="Process Order") as span:
            
            validation_start = time.time()
            validation_result = self.validate_checkout_data(request.POST)
            span.set_data("request.validation_time_ms", 
                             (time.time() - validation_start) * 1000)
            
            if not validation_result["valid"]:
                span.set_data("request.validation_success", False)
                span.set_data("request.validation_errors", validation_result["errors"])
                return JsonResponse({"errors": validation_result["errors"]}, status=400)
            
            
            try:
                order_result = self.process_order(request)
                
                
                span.set_data("order.id", order_result["order_id"])
                span.set_data("order.success", True)
                
                
                request.session["cart"] = []
                request.session["cart_total"] = 0
                
                return JsonResponse({"order_id": order_result["order_id"]})
                
            except Exception as e:
                span.set_status(sentry_sdk.SpanStatus.ERROR)
                span.set_data("order.success", False)
                span.set_data("error.message", str(e))
                return JsonResponse({"error": str(e)}, status=500)

Server Application Instrumentation:

Copied


import sentry_sdk
import stripe
from decimal import Decimal

def process_order(order_data):
    with sentry_sdk.start_span(
        op="inventory",
        name="Check Inventory"
    ) as span:
        try:
            
            inventory_start = time.time()
            inventory_result = check_inventory(order_data["items"])
            span.set_data("inventory.check_time_ms", 
                             (time.time() - inventory_start) * 1000)
            span.set_data("inventory.all_available", inventory_result["all_available"])
            
            if not inventory_result["all_available"]:
                span.set_data("inventory.unavailable_items", 
                                 inventory_result["unavailable_items"])
                raise ValueError("Some items are out of stock")
            
            
            payment_start = time.time()
            stripe.api_key = "sk_test_..."
            payment_intent = stripe.PaymentIntent.create(
                amount=int(Decimal(order_data["total"]) * 100),  
                currency="usd",
                payment_method=order_data["payment_method_id"],
                confirm=True
            )
            
            span.set_data("payment.processing_time_ms", 
                             (time.time() - payment_start) * 1000)
            span.set_data("payment.transaction_id", payment_intent.id)
            span.set_data("payment.success", payment_intent.status == "succeeded")
            
            
            fulfillment = create_fulfillment(order_data["order_id"])
            span.set_data("fulfillment.id", fulfillment["id"])
            span.set_data("fulfillment.warehouse", fulfillment["warehouse"])
            span.set_data("fulfillment.shipping_method", fulfillment["shipping_method"])
            span.set_data("fulfillment.estimated_delivery", 
                             fulfillment["estimated_delivery"].isoformat())
            
            return {
                "success": True,
                "order_id": order_data["order_id"],
                "payment_id": payment_intent.id,
                "fulfillment_id": fulfillment["id"]
            }
            
        except Exception as e:
            span.set_status(sentry_sdk.SpanStatus.ERROR)
            span.set_data("error.message", str(e))
            span.set_data("order.success", False)
            raise

How the Trace Works Together: The client application span tracks the initial order request, while the server span handles order processing and fulfillment. The distributed trace provides visibility into the entire purchase flow, allowing you to:

Data Processing Pipeline

Challenge: Understanding performance and reliability of distributed data processing pipelines, from job submission through completion.

Solution: Comprehensive tracking of job lifecycle across queue management, processing stages, and worker performance.

Client Application Instrumentation:

Copied


import sentry_sdk
from celery import shared_task
import time
from datetime import datetime, timedelta

def submit_processing_job(data_file_path, priority="medium"):
    with sentry_sdk.start_span(op="job", name="Process Job") as span:
        
        try:
            
            task = process_data_file.apply_async(
                args=[str(data_file_path)],
                kwargs={"priority": priority},
                queue="data_processing"
            )
            
            span.set_data("job.id", task.id)
            span.set_data("job.submission_success", True)
            
            
            monitoring_result = setup_task_monitoring(task.id)
            span.set_data("monitor.callback_url", monitoring_result["callback_url"])
            
            return {
                "job_id": task.id,
                "status": "submitted",
                "monitoring_url": monitoring_result["status_url"]
            }
            
        except Exception as e:
            span.set_status(sentry_sdk.SpanStatus.ERROR)
            span.set_data("job.submission_success", False)
            span.set_data("error.message", str(e))
            raise

@shared_task(name="tasks.process_data_file")
def process_data_file(file_path, priority="medium"):
    with sentry_sdk.start_span(
        op="process",
        name="Process Data"
    ) as span:
        try:
            
            span.set_data("processing.current_stage", "parse")
            data = parse_data_file(file_path)
            
            span.set_data("processing.current_stage", "transform")
            transformed_data = transform_data(data)
            
            span.set_data("processing.current_stage", "validate")
            validation_result = validate_data(transformed_data)
            span.set_data("validation.errors_count", len(validation_result["errors"]))
            
            span.set_data("processing.current_stage", "export")
            export_result = export_processed_data(transformed_data)
            
            
            span.set_data("resource.cpu_percent", psutil.cpu_percent())
            span.set_data("resource.memory_used_mb", 
                             psutil.Process().memory_info().rss / (1024 * 1024))
            
            
            span.set_data("outcome.status", "completed")
            span.set_data("outcome.records_processed", len(data))
            span.set_data("outcome.output_size_bytes", 
                             os.path.getsize(export_result["output_path"]))
            
            return {
                "success": True,
                "records_processed": len(data),
                "output_path": export_result["output_path"]
            }
            
        except Exception as e:
            span.set_status(sentry_sdk.SpanStatus.ERROR)
            span.set_data("outcome.status", "failed")
            span.set_data("error.message", str(e))
            span.set_data("error.stage", span.get_data("processing.current_stage"))
            raise

How the Trace Works Together: This example shows both the job submission and processing as a single trace, which is common in Celery/distributed task patterns. The spans track the entire job lifecycle, enabling you to:

For more information about implementing these examples effectively, see our Span Metrics guide which includes detailed best practices and implementation guidelines.

Help improve this content
Our documentation is open source and available on GitHub. Your contributions are welcome, whether fixing a typo (drat!) or suggesting an update ("yeah, this would be better").

RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4