A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://pkg.go.dev/github.com/jurabek/otelkafka below:

otelkafka package - github.com/jurabek/otelkafka - Go Packages

otelkafka

Open Telemetry instrumentation for confluent-kafka-go.

Installation
go get -u github.com/jurabek/otelkafka
Usage
package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
    "github.com/otelkafka/otelkafka"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/trace"
)

func main() {
    // Create a new Kafka producer
    producer, err := otelkafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
    })
    if err != nil {
        log.Fatalf("Failed to create producer: %v", err)
    }
    defer producer.Close()

    // Produce a message
    topic := "my-topic"
    message := &kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          []byte("Hello, World!"),
    }

    // make sure the context is propagated before produce the message
	otel.GetTextMapPropagator().Inject(ctx, otelkafka.NewMessageCarrier(message))
    if err := producer.Produce(message, nil); err != nil {
        log.Fatalf("Failed to produce message: %v", err)
    }

    // Wait for the message to be delivered
    producer.Flush(15 * 1000)

    // Create a new Kafka consumer
    consumer, err := otelkafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
        "group.id":          "my-group",
        "auto.offset.reset": "earliest",
    })
    if err != nil {
        log.Fatalf("Failed to create consumer: %v", err)
    }
    defer consumer.Close()

    // Subscribe to the topic
    if err := consumer.Subscribe(topic, nil); err != nil {
        log.Fatalf("Failed to subscribe to topic: %v", err)
    }

    // Consume messages
    for {
        message, err := consumer.ReadMessage(-1)
        if err != nil {
            log.Fatalf("Failed to read message: %v", err)
        }
        fmt.Printf("Message: %s\n", message.Value)
    }
}
Metrics list

Table below lists the metrics that are collected by the instrumentation, and can exported using Otel Metric Exporter

Name Description Type Attributes messaging.client.sent.messages The total number of messages sent by the producer. Counter more attributes messaging.client.consumed.messages The total number of messages received by the consumer. Counter more attributes messaging.client.operation.duration The duration of the messaging operation initiated by a producer or consumer client. filtered by messaging.system.operation.name Histogram more attributes

RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4