Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later). It includes a high-level API for easily producing and consuming messages, and a low-level API for controlling bytes on the wire when the high-level API is insufficient. Usage examples for the high-level APIs are provided inline with their full documentation.
To produce messages, use either the AsyncProducer or the SyncProducer. The AsyncProducer accepts messages on a channel and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases. The SyncProducer provides a method which will block until Kafka acknowledges the message as produced. This can be useful but comes with two caveats: it will generally be less efficient, and the actual durability guarantees depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
To consume messages, use Consumer or Consumer-Group API.
For lower-level needs, the Broker and Request/Response objects permit precise control over each connection and message sent on the wire; the Client provides higher-level metadata management that is shared between the producers and the consumer. The Request/Response objects and properties are mostly undocumented, as they line up exactly with the protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
Metrics are exposed through https://github.com/rcrowley/go-metrics library in a local registry.
Broker related metrics:
+---------------------------------------------------------+------------+---------------------------------------------------------------+ | Name | Type | Description | +---------------------------------------------------------+------------+---------------------------------------------------------------+ | incoming-byte-rate | meter | Bytes/second read off all brokers | | incoming-byte-rate-for-broker-<broker-id> | meter | Bytes/second read off a given broker | | outgoing-byte-rate | meter | Bytes/second written off all brokers | | outgoing-byte-rate-for-broker-<broker-id> | meter | Bytes/second written off a given broker | | request-rate | meter | Requests/second sent to all brokers | | request-rate-for-broker-<broker-id> | meter | Requests/second sent to a given broker | | request-size | histogram | Distribution of the request size in bytes for all brokers | | request-size-for-broker-<broker-id> | histogram | Distribution of the request size in bytes for a given broker | | request-latency-in-ms | histogram | Distribution of the request latency in ms for all brokers | | request-latency-in-ms-for-broker-<broker-id> | histogram | Distribution of the request latency in ms for a given broker | | response-rate | meter | Responses/second received from all brokers | | response-rate-for-broker-<broker-id> | meter | Responses/second received from a given broker | | response-size | histogram | Distribution of the response size in bytes for all brokers | | response-size-for-broker-<broker-id> | histogram | Distribution of the response size in bytes for a given broker | | requests-in-flight | counter | The current number of in-flight requests awaiting a response | | | | for all brokers | | requests-in-flight-for-broker-<broker-id> | counter | The current number of in-flight requests awaiting a response | | | | for a given broker | | protocol-requests-rate-<api-key> | meter | Number of api requests sent to the brokers for all brokers | | | | https://kafka.apache.org/protocol.html#protocol_api_keys | | | protocol-requests-rate-<api-key>-for-broker-<broker-id> | meter | Number of packets sent to the brokers by api-key for a given | | | | broker | +---------------------------------------------------------+------------+---------------------------------------------------------------+
Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.
Producer related metrics:
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+ | Name | Type | Description | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ | batch-size | histogram | Distribution of the number of bytes sent per partition per request for all topics | | batch-size-for-topic-<topic> | histogram | Distribution of the number of bytes sent per partition per request for a given topic | | record-send-rate | meter | Records/second sent to all topics | | record-send-rate-for-topic-<topic> | meter | Records/second sent to a given topic | | records-per-request | histogram | Distribution of the number of records sent per request for all topics | | records-per-request-for-topic-<topic> | histogram | Distribution of the number of records sent per request for a given topic | | compression-ratio | histogram | Distribution of the compression ratio times 100 of record batches for all topics | | compression-ratio-for-topic-<topic> | histogram | Distribution of the compression ratio times 100 of record batches for a given topic | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
Consumer related metrics:
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+ | Name | Type | Description | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ | consumer-batch-size | histogram | Distribution of the number of messages in a batch | | consumer-fetch-rate | meter | Fetch requests/second sent to all brokers | | consumer-fetch-rate-for-broker-<broker> | meter | Fetch requests/second sent to a given broker | | consumer-fetch-rate-for-topic-<topic> | meter | Fetch requests/second sent for a given topic | | consumer-fetch-response-size | histogram | Distribution of the fetch response size in bytes | | consumer-group-join-total-<GroupID> | counter | Total count of consumer group join attempts | | consumer-group-join-failed-<GroupID> | counter | Total count of consumer group join failures | | consumer-group-sync-total-<GroupID> | counter | Total count of consumer group sync attempts | | consumer-group-sync-failed-<GroupID> | counter | Total count of consumer group sync failures | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
const ( RangeBalanceStrategyName = "range" RoundRobinBalanceStrategyName = "roundrobin" StickyBalanceStrategyName = "sticky" )View Source
const ( SASLTypeOAuth = "OAUTHBEARER" SASLTypePlaintext = "PLAIN" SASLTypeSCRAMSHA256 = "SCRAM-SHA-256" SASLTypeSCRAMSHA512 = "SCRAM-SHA-512" SASLTypeGSSAPI = "GSSAPI" SASLHandshakeV0 = int16(0) SASLHandshakeV1 = int16(1) SASLExtKeyAuth = "auth" )View Source
const ( TOK_ID_KRB_AP_REQ = 256 GSS_API_GENERIC_TAG = 0x60 KRB5_USER_AUTH = 1 KRB5_KEYTAB_AUTH = 2 KRB5_CCACHE_AUTH = 3 GSS_API_INITIAL = 1 GSS_API_VERIFY = 2 GSS_API_FINISH = 3 )
APIKeySASLAuth is the API key for the SaslAuthenticate Kafka API
GroupGenerationUndefined is a special value for the group generation field of Offset Commit Requests that should be used when a consumer group does not rely on Kafka for partition management.
ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received. The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
View Sourcevar ( V0_8_2_0 = newKafkaVersion(0, 8, 2, 0) V0_8_2_1 = newKafkaVersion(0, 8, 2, 1) V0_8_2_2 = newKafkaVersion(0, 8, 2, 2) V0_9_0_0 = newKafkaVersion(0, 9, 0, 0) V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) V0_10_1_1 = newKafkaVersion(0, 10, 1, 1) V0_10_2_0 = newKafkaVersion(0, 10, 2, 0) V0_10_2_1 = newKafkaVersion(0, 10, 2, 1) V0_10_2_2 = newKafkaVersion(0, 10, 2, 2) V0_11_0_0 = newKafkaVersion(0, 11, 0, 0) V0_11_0_1 = newKafkaVersion(0, 11, 0, 1) V0_11_0_2 = newKafkaVersion(0, 11, 0, 2) V1_0_0_0 = newKafkaVersion(1, 0, 0, 0) V1_0_1_0 = newKafkaVersion(1, 0, 1, 0) V1_0_2_0 = newKafkaVersion(1, 0, 2, 0) V1_1_0_0 = newKafkaVersion(1, 1, 0, 0) V1_1_1_0 = newKafkaVersion(1, 1, 1, 0) V2_0_0_0 = newKafkaVersion(2, 0, 0, 0) V2_0_1_0 = newKafkaVersion(2, 0, 1, 0) V2_1_0_0 = newKafkaVersion(2, 1, 0, 0) V2_1_1_0 = newKafkaVersion(2, 1, 1, 0) V2_2_0_0 = newKafkaVersion(2, 2, 0, 0) V2_2_1_0 = newKafkaVersion(2, 2, 1, 0) V2_2_2_0 = newKafkaVersion(2, 2, 2, 0) V2_3_0_0 = newKafkaVersion(2, 3, 0, 0) V2_3_1_0 = newKafkaVersion(2, 3, 1, 0) V2_4_0_0 = newKafkaVersion(2, 4, 0, 0) V2_4_1_0 = newKafkaVersion(2, 4, 1, 0) V2_5_0_0 = newKafkaVersion(2, 5, 0, 0) V2_5_1_0 = newKafkaVersion(2, 5, 1, 0) V2_6_0_0 = newKafkaVersion(2, 6, 0, 0) V2_6_1_0 = newKafkaVersion(2, 6, 1, 0) V2_6_2_0 = newKafkaVersion(2, 6, 2, 0) V2_6_3_0 = newKafkaVersion(2, 6, 3, 0) V2_7_0_0 = newKafkaVersion(2, 7, 0, 0) V2_7_1_0 = newKafkaVersion(2, 7, 1, 0) V2_7_2_0 = newKafkaVersion(2, 7, 2, 0) V2_8_0_0 = newKafkaVersion(2, 8, 0, 0) V2_8_1_0 = newKafkaVersion(2, 8, 1, 0) V2_8_2_0 = newKafkaVersion(2, 8, 2, 0) V3_0_0_0 = newKafkaVersion(3, 0, 0, 0) V3_0_1_0 = newKafkaVersion(3, 0, 1, 0) V3_0_2_0 = newKafkaVersion(3, 0, 2, 0) V3_1_0_0 = newKafkaVersion(3, 1, 0, 0) V3_1_1_0 = newKafkaVersion(3, 1, 1, 0) V3_1_2_0 = newKafkaVersion(3, 1, 2, 0) V3_2_0_0 = newKafkaVersion(3, 2, 0, 0) V3_2_1_0 = newKafkaVersion(3, 2, 1, 0) V3_2_2_0 = newKafkaVersion(3, 2, 2, 0) V3_2_3_0 = newKafkaVersion(3, 2, 3, 0) V3_3_0_0 = newKafkaVersion(3, 3, 0, 0) V3_3_1_0 = newKafkaVersion(3, 3, 1, 0) V3_3_2_0 = newKafkaVersion(3, 3, 2, 0) V3_4_0_0 = newKafkaVersion(3, 4, 0, 0) V3_4_1_0 = newKafkaVersion(3, 4, 1, 0) V3_5_0_0 = newKafkaVersion(3, 5, 0, 0) V3_5_1_0 = newKafkaVersion(3, 5, 1, 0) V3_5_2_0 = newKafkaVersion(3, 5, 2, 0) V3_6_0_0 = newKafkaVersion(3, 6, 0, 0) V3_6_1_0 = newKafkaVersion(3, 6, 1, 0) V3_6_2_0 = newKafkaVersion(3, 6, 2, 0) V3_7_0_0 = newKafkaVersion(3, 7, 0, 0) V3_7_1_0 = newKafkaVersion(3, 7, 1, 0) V3_7_2_0 = newKafkaVersion(3, 7, 2, 0) V3_8_0_0 = newKafkaVersion(3, 8, 0, 0) V3_8_1_0 = newKafkaVersion(3, 8, 1, 0) V3_9_0_0 = newKafkaVersion(3, 9, 0, 0) V4_0_0_0 = newKafkaVersion(4, 0, 0, 0) SupportedVersions = []KafkaVersion{ V0_8_2_0, V0_8_2_1, V0_8_2_2, V0_9_0_0, V0_9_0_1, V0_10_0_0, V0_10_0_1, V0_10_1_0, V0_10_1_1, V0_10_2_0, V0_10_2_1, V0_10_2_2, V0_11_0_0, V0_11_0_1, V0_11_0_2, V1_0_0_0, V1_0_1_0, V1_0_2_0, V1_1_0_0, V1_1_1_0, V2_0_0_0, V2_0_1_0, V2_1_0_0, V2_1_1_0, V2_2_0_0, V2_2_1_0, V2_2_2_0, V2_3_0_0, V2_3_1_0, V2_4_0_0, V2_4_1_0, V2_5_0_0, V2_5_1_0, V2_6_0_0, V2_6_1_0, V2_6_2_0, V2_6_3_0, V2_7_0_0, V2_7_1_0, V2_7_2_0, V2_8_0_0, V2_8_1_0, V2_8_2_0, V3_0_0_0, V3_0_1_0, V3_0_2_0, V3_1_0_0, V3_1_1_0, V3_1_2_0, V3_2_0_0, V3_2_1_0, V3_2_2_0, V3_2_3_0, V3_3_0_0, V3_3_1_0, V3_3_2_0, V3_4_0_0, V3_4_1_0, V3_5_0_0, V3_5_1_0, V3_5_2_0, V3_6_0_0, V3_6_1_0, V3_6_2_0, V3_7_0_0, V3_7_1_0, V3_7_2_0, V3_8_0_0, V3_8_1_0, V3_9_0_0, V4_0_0_0, } MinVersion = V0_8_2_0 MaxVersion = V4_0_0_0 DefaultVersion = V2_1_0_0 )
Effective constants defining the supported kafka versions.
Deprecated: use NewBalanceStrategyRange to avoid data race issue
Deprecated: use NewBalanceStrategyRoundRobin to avoid data race issue
Deprecated: use NewBalanceStrategySticky to avoid data race issue
View Sourcevar ErrAddPartitionsToTxn = errors.New("transaction manager: failed to send partitions to transaction")
ErrAddPartitionsToTxn is returned when AddPartitionsToTxn failed multiple times
ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.
ErrBrokerNotFound is the error returned when there's no broker found for the requested ID.
View Sourcevar ErrCannotTransitionNilError = errors.New("transaction manager: cannot transition with a nil error")
ErrCannotTransitionNilError when transition is attempted with an nil error.
ErrClosedClient is the error returned when a method is called on a client that has been closed.
View Sourcevar ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
View Sourcevar ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")
ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing a RecordBatch.
ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version is lower than 0.10.0.0.
ErrCreateACLs is the type of error returned when ACL creation failed
ErrDeleteRecords is the type of error returned when fail to delete the required records
View Sourcevar ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")
ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.
View Sourcevar ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")
ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.
View Sourcevar ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")
ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).
ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
View Sourcevar ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")
ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update the metadata.
View Sourcevar ErrNonTransactedProducer = errors.New("transaction manager: you need to add TransactionalID to producer")
ErrNonTransactedProducer when calling BeginTxn, CommitTxn or AbortTxn on a non transactional producer.
ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
View Sourcevar ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to")
ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.
View Sourcevar ErrProducerRetryBufferOverflow = errors.New("retry buffer full: message discarded to prevent buffer overflow")
ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied.
ErrReassignPartitions is returned when altering partition assignments for a topic fails
View Sourcevar ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")
ErrShuttingDown is returned when a producer receives a message during shutdown.
View Sourcevar ErrTransactionNotReady = errors.New("transaction manager: transaction is not ready")
ErrTransactionNotReady when transaction status is invalid for the current action.
View Sourcevar ErrTransitionNotAllowed = errors.New("transaction manager: invalid transition attempted")
ErrTransitionNotAllowed when txnmgr state transition is not valid.
View Sourcevar ErrTxnOffsetCommit = errors.New("transaction manager: failed to send offsets to transaction")
ErrTxnOffsetCommit is returned when TxnOffsetCommit failed multiple times
View Sourcevar ErrTxnUnableToParseResponse = errors.New("transaction manager: unable to parse response")
ErrTxnUnableToParseResponse when response is nil
ErrUnknownScramMechanism is returned when user tries to AlterUserScramCredentials with unknown SCRAM mechanism
MultiErrorFormat specifies the formatter applied to format multierrors. The default implementation is a condensed version of the hashicorp/go-multierror default one
View Sourcevar NullUUID = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
NewExponentialBackoff returns a function that implements an exponential backoff strategy with jitter. It follows KIP-580, implementing the formula: MIN(retry.backoff.max.ms, (retry.backoff.ms * 2**(failures - 1)) * random(0.8, 1.2)) This ensures retries start with `backoff` and exponentially increase until `maxBackoff`, with added jitter. The behavior when `failures = 0` is not explicitly defined in KIP-580 and is left to implementation discretion.
Example usage:
backoffFunc := sarama.NewExponentialBackoff(config.Producer.Retry.Backoff, 2*time.Second) config.Producer.Retry.BackoffFunc = backoffFunc
func Wrap(sentinel error, wrapped ...error) sentinelError
type AbortedTransaction struct { ProducerID int64 FirstOffset int64 }
AccessToken contains an access token used to authenticate a SASL/OAUTHBEARER client along with associated metadata.
AccessTokenProvider is the interface that encapsulates how implementors can generate access tokens for Kafka broker authentication.
Acl holds information about acl type
AclCreation is a wrapper around Resource and Acl type
type AclCreationResponse struct { Err KError ErrMsg *string }
AclCreationResponse is an acl creation response type
MarshalText returns the text form of the AclOperation (name without prefix)
UnmarshalText takes a text representation of the operation and converts it to an AclOperation
type AclPermissionType int
MarshalText returns the text form of the AclPermissionType (name without prefix)
UnmarshalText takes a text representation of the permission type and converts it to an AclPermissionType
type AclResourcePatternType int
MarshalText returns the text form of the AclResourcePatternType (name without prefix)
UnmarshalText takes a text representation of the resource pattern type and converts it to an AclResourcePatternType
MarshalText returns the text form of the AclResourceType (name without prefix)
UnmarshalText takes a text representation of the resource type and converts it to an AclResourceType
AddOffsetsToTxnRequest adds offsets to a transaction request
AddOffsetsToTxnResponse is a response type for adding offsets to txns
AddPartitionsToTxnRequest is a add partition request
AddPartitionsToTxnResponse is a partition errors to transaction type
AlterConfigsRequest is an alter config request type
AlterConfigsResource is an alter config resource type
AlterConfigsResourceResponse is a response type for alter config resource
AlterConfigsResponse is a response type for alter config
type AlterPartitionReassignmentsRequest struct { TimeoutMs int32 Version int16 }
type AlterPartitionReassignmentsResponse struct { Version int16 ThrottleTimeMs int32 ErrorCode KError ErrorMessage *string Errors map[string]map[int32]*alterPartitionReassignmentsErrorBlock }
type AlterUserScramCredentialsResult struct { User string ErrorCode KError ErrorMessage *string }
type ApiVersionsRequest struct { Version int16 ClientSoftwareName string ClientSoftwareVersion string }
ApiVersionsResponseKey contains the APIs supported by the broker.
AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages to the correct broker for the provided topic-partition, refreshing metadata as appropriate, and parses responses for errors. You must read from the Errors() channel or the producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid leaks and message lost: it will not be garbage-collected automatically when it passes out of scope and buffered messages may not be flushed.
This example shows how to use the producer with separate goroutines reading from the Successes and Errors channels. Note that in order for the Successes channel to be populated, you have to set config.Producer.Return.Successes to true.
config := NewTestConfig() config.Producer.Return.Successes = true producer, err := NewAsyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(err) } // Trap SIGINT to trigger a graceful shutdown. signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) var ( wg sync.WaitGroup enqueued, successes, producerErrors int ) wg.Add(1) go func() { defer wg.Done() for range producer.Successes() { successes++ } }() wg.Add(1) go func() { defer wg.Done() for err := range producer.Errors() { log.Println(err) producerErrors++ } }() ProducerLoop: for { message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")} select { case producer.Input() <- message: enqueued++ case <-signals: producer.AsyncClose() // Trigger a shutdown of the producer. break ProducerLoop } } wg.Wait() log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors)
This example shows how to use the producer while simultaneously reading the Errors channel to know about any failures.
producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil) if err != nil { panic(err) } defer func() { if err := producer.Close(); err != nil { log.Fatalln(err) } }() // Trap SIGINT to trigger a shutdown. signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) var enqueued, producerErrors int ProducerLoop: for { select { case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}: enqueued++ case err := <-producer.Errors(): log.Println("Failed to produce message", err) producerErrors++ case <-signals: break ProducerLoop } } log.Printf("Enqueued: %d; errors: %d\n", enqueued, producerErrors)
NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
NewAsyncProducerFromClient creates a new Producer using the given client. It is still necessary to call Close() on the underlying client when shutting down this producer.
BalanceStrategy is used to balance topics and partitions across members of a consumer group
NewBalanceStrategyRange returns a range balance strategy, which is the default and assigns partitions as ranges to consumer group members. This follows the same logic as https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html
Example with two topics T1 and T2 with six partitions each (0..5) and two members (M1, M2):
M1: {T1: [0, 1, 2], T2: [0, 1, 2]} M2: {T1: [3, 4, 5], T2: [3, 4, 5]}
NewBalanceStrategyRoundRobin returns a round-robin balance strategy, which assigns partitions to members in alternating order. For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2): M0: [t0p0, t0p2, t1p1] M1: [t0p1, t1p0, t1p2]
NewBalanceStrategySticky returns a sticky balance strategy, which assigns partitions to members with an attempt to preserve earlier assignments while maintain a balanced partition distribution. Example with topic T with six partitions (0..5) and two members (M1, M2):
M1: {T: [0, 2, 4]} M2: {T: [1, 3, 5]}
On reassignment with an additional consumer, you might get an assignment plan like:
M1: {T: [0, 2]} M2: {T: [1, 3]} M3: {T: [4, 5]}
BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt. It contains an allocation of topic/partitions by memberID in the form of a `memberID -> topic -> partitions` map.
Add assigns a topic with a number partitions to a member.
Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
broker := NewBroker("localhost:9092") err := broker.Open(nil) if err != nil { panic(err) } request := MetadataRequest{Topics: []string{"myTopic"}} response, err := broker.GetMetadata(&request) if err != nil { _ = broker.Close() panic(err) } fmt.Println("There are", len(response.Topics), "topics active in the cluster.") if err = broker.Close(); err != nil { panic(err) }
NewBroker creates and returns a Broker targeting the given host:port address. This does not attempt to actually connect, you have to call Open() for that.
AddOffsetsToTxn sends a request to add offsets to txn and returns a response or error
AddPartitionsToTxn send a request to add partition to txn and returns a response or error
Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
AlterClientQuotas sends a request to alter the broker's quotas
AlterConfigs sends a request to alter config and return a response or error
AlterPartitionReassignments sends a alter partition reassignments request and returns alter partition reassignments response
ApiVersions return api version response or error
AsyncProduce sends a produce request and eventually call the provided callback with a produce response or an error.
Waiting for the response is generally not blocking on the contrary to using Produce. If the maximum number of in flight request configured is reached then the request will be blocked till a previous response is received.
When configured with RequiredAcks == NoResponse, the callback will not be invoked. If an error is returned because the request could not be sent then the callback will not be invoked either.
Make sure not to Close the broker in the callback as it will lead to a deadlock.
Close closes the broker resources
CommitOffset return an Offset commit response or error
Connected returns true if the broker is connected and false otherwise. If the broker is not connected but it had tried to connect, the error from that connection attempt is also returned.
CreateAcls sends a create acl request and returns a response or error
CreatePartitions sends a create partition request and returns create partitions response or error
CreateTopics send a create topic request and returns create topic response
DeleteAcls sends a delete acl request and returns a response or error
DeleteGroups sends a request to delete groups and returns a response or error
DeleteOffsets sends a request to delete group offsets and returns a response or error
DeleteRecords send a request to delete records and return delete record response or error
DeleteTopics sends a delete topic request and returns delete topic response
DescribeAcls sends a describe acl request and returns a response or error
DescribeClientQuotas sends a request to get the broker's quotas
DescribeConfigs sends a request to describe config and returns a response or error
DescribeGroups return describe group response or error
DescribeLogDirs sends a request to get the broker's log dir paths and sizes
DescribeUserScramCredentials sends a request to get SCRAM users
ElectLeaders sends aa elect leaders request and returns list partitions elect result
EndTxn sends a request to end txn and returns a response or error
Fetch returns a FetchResponse or error
FetchOffset returns an offset fetch response or error
FindCoordinator sends a find coordinate request and returns a response or error
GetAvailableOffsets return an offset response or error
GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
GetMetadata send a metadata request and returns a metadata response or error
Heartbeat returns a heartbeat response or error
ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
IncrementalAlterConfigs sends a request to incremental alter config and return a response or error
InitProducerID sends an init producer request and returns a response or error
JoinGroup returns a join group response or error
LeaveGroup return a leave group response or error
ListGroups return a list group response or error
ListPartitionReassignments sends a list partition reassignments request and returns list partition reassignments response
Open tries to connect to the Broker if it is not already connected or connecting, but does not block waiting for the connection to complete. This means that any subsequent operations on the broker will block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call, follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of NewConfig() is used.
Produce returns a produce response or error
Rack returns the broker's rack as retrieved from Kafka's metadata or the empty string if it is not known. The returned value corresponds to the broker's broker.rack configuration setting. Requires protocol version to be at least v0.10.0.0.
SyncGroup returns a sync group response or error
TLSConnectionState returns the client's TLS connection state. The second return value is false if this is not a tls connection or the connection has not yet been established.
TxnOffsetCommit sends a request to commit transaction offsets and returns a response or error
ByteEncoder implements the Encoder interface for Go byte slices so that they can be used as the Key or Value in a ProducerMessage.
type Client interface { Config() *Config Controller() (*Broker, error) RefreshController() (*Broker, error) Brokers() []*Broker Broker(brokerID int32) (*Broker, error) Topics() ([]string, error) Partitions(topic string) ([]int32, error) WritablePartitions(topic string) ([]int32, error) Leader(topic string, partitionID int32) (*Broker, error) LeaderAndEpoch(topic string, partitionID int32) (*Broker, int32, error) Replicas(topic string, partitionID int32) ([]int32, error) InSyncReplicas(topic string, partitionID int32) ([]int32, error) OfflineReplicas(topic string, partitionID int32) ([]int32, error) RefreshBrokers(addrs []string) error RefreshMetadata(topics ...string) error GetOffset(topic string, partitionID int32, time int64) (int64, error) Coordinator(consumerGroup string) (*Broker, error) RefreshCoordinator(consumerGroup string) error TransactionCoordinator(transactionID string) (*Broker, error) RefreshTransactionCoordinator(transactionID string) error InitProducerID() (*InitProducerIDResponse, error) LeastLoadedBroker() *Broker PartitionNotReadable(topic string, partition int32) bool Close() error Closed() bool }
Client is a generic Kafka client. It manages connections to one or more Kafka brokers. You MUST call Close() on a client to avoid leaks, it will not be garbage-collected automatically when it passes out of scope. It is safe to share a client amongst many users, however Kafka will process requests from a single client strictly in serial, so it is generally more efficient to use the default one client per producer/consumer.
NewClient creates a new Client. It connects to one of the given broker addresses and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot be retrieved from any of the given broker addresses, the client is not created.
type ClusterAdmin interface { CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error ListTopics() (map[string]TopicDetail, error) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) DeleteTopic(topic string) error CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error AlterPartitionReassignments(topic string, assignment [][]int32) error ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) DeleteRecords(topic string, partitionOffsets map[int32]int64) error DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error CreateACL(resource Resource, acl Acl) error CreateACLs([]*ResourceAcls) error ListAcls(filter AclFilter) ([]ResourceAcls, error) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) ElectLeaders(ElectionType, map[string][]int32) (map[string]map[int32]*PartitionResult, error) ListConsumerGroups() (map[string]string, error) DescribeConsumerGroups(groups []string) ([]*GroupDescription, error) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) DeleteConsumerGroupOffset(group string, topic string, partition int32) error DeleteConsumerGroup(group string) error DescribeCluster() (brokers []*Broker, controllerID int32, err error) DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error) DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error) DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error) AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error Controller() (*Broker, error) Coordinator(group string) (*Broker, error) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) Close() error }
ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0. Methods with stricter requirements will specify the minimum broker version required. You MUST call Close() on a client to avoid leaks
NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
NewClusterAdminFromClient creates a new ClusterAdmin using the given client. Note that underlying client will also be closed on admin's Close() call.
type CompressionCodec int8
CompressionCodec represents the various compression codecs recognized by Kafka in messages.
const ( CompressionNone CompressionCodec = iota CompressionGZIP CompressionSnappy CompressionLZ4 CompressionZSTD CompressionLevelDefault = -1000 )
MarshalText transforms a CompressionCodec into its string representation.
UnmarshalText returns a CompressionCodec from its string representation.
Config is used to pass multiple configuration options to Sarama's constructors.
This example shows how to integrate with an existing registry as well as publishing metrics on the standard output
// Our application registry appMetricRegistry := metrics.NewRegistry() appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry) appGauge.Update(1) config := NewTestConfig() // Use a prefix registry instead of the default local one config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.") // Simulate a metric created by sarama without starting a broker saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry) saramaGauge.Update(2) metrics.WriteOnce(appMetricRegistry, os.Stdout)
Output: gauge m1 value: 1 gauge sarama.m2 value: 2
NewConfig returns a new configuration instance with sane defaults.
Validate checks a Config instance. It will return a ConfigurationError if the specified values don't make sense.
type ConfigResourceType int8
ConfigResourceType is a type for resources that have configs.
const ( SourceUnknown ConfigSource = iota SourceTopic SourceDynamicBroker SourceDynamicDefaultBroker SourceStaticBroker SourceDefault )
type ConfigurationError string
ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer) when the specified configuration is invalid.
Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.
This example shows how to use the consumer to read messages from a single partition.
consumer, err := NewConsumer([]string{"localhost:9092"}, NewTestConfig()) if err != nil { panic(err) } defer func() { if err := consumer.Close(); err != nil { log.Fatalln(err) } }() partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest) if err != nil { panic(err) } defer func() { if err := partitionConsumer.Close(); err != nil { log.Fatalln(err) } }() // Trap SIGINT to trigger a shutdown. signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) consumed := 0 ConsumerLoop: for { select { case msg := <-partitionConsumer.Messages(): log.Printf("Consumed message offset %d\n", msg.Offset) consumed++ case <-signals: break ConsumerLoop } } log.Printf("Consumed: %d\n", consumed)
NewConsumer creates a new consumer using the given broker addresses and configuration.
NewConsumerFromClient creates a new consumer using the given client. It is still necessary to call Close() on the underlying client when shutting down this consumer.
ConsumerError is what is provided to the user when an error occurs. It wraps an error and includes the topic and partition.
ConsumerErrors is a type that wraps a batch of errors and implements the Error interface. It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors when stopping.
ConsumerGroup is responsible for dividing up processing of topics and partitions over a collection of processes (the members of the consumer group).
//go:build !functional package main import ( "context" "fmt" ) type exampleConsumerGroupHandler struct{} func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil } func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil } func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error { for msg := range claim.Messages() { fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset) sess.MarkMessage(msg, "") } return nil } func main() { config := NewTestConfig() config.Version = V2_0_0_0 // specify appropriate version config.Consumer.Return.Errors = true group, err := NewConsumerGroup([]string{"localhost:9092"}, "my-group", config) if err != nil { panic(err) } defer func() { _ = group.Close() }() // Track errors go func() { for err := range group.Errors() { fmt.Println("ERROR", err) } }() // Iterate over consumer sessions. ctx := context.Background() for { topics := []string{"my-topic"} handler := exampleConsumerGroupHandler{} // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims err := group.Consume(ctx, topics, handler) if err != nil { panic(err) } } }
NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
NewConsumerGroupFromClient creates a new consumer group using the given client. It is still necessary to call Close() on the underlying client when shutting down this consumer. PLEASE NOTE: consumer groups can only re-use but not share clients.
ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
type ConsumerGroupHandler ¶ added in v1.19.0ConsumerGroupHandler instances are used to handle individual topic/partition claims. It also provides hooks for your consumer group session life-cycle and allow you to trigger logic before or after the consume loop(s).
PLEASE NOTE that handlers are likely be called from several goroutines concurrently, ensure that all state is safely protected against race conditions.
ConsumerGroupSession represents a consumer group member session.
ConsumerMessage encapsulates a Kafka message returned by the consumer.
type ConsumerMetadataRequest struct { Version int16 ConsumerGroup string }
ConsumerMetadataRequest is used for metadata requests
ConsumerMetadataResponse holds the response for a consumer group meta data requests
Control records are returned as a record by fetchRequest However unlike "normal" records, they mean nothing application wise. They only serve internal logic for supporting transactions.
type ControlRecordType int
ControlRecordType ...
type CoordinatorType int8
CreateAclsRequest is an acl creation request
CreateAclsResponse is a an acl response creation type
type DeleteAclsRequest struct { Version int Filters []*AclFilter }
DeleteAclsRequest is a delete acl request
DeleteAclsResponse is a delete acl response
type DeleteGroupsRequest struct { Version int16 Groups []string }
type DeleteOffsetsRequest struct { Version int16 Group string }
type DeleteRecordsRequestTopic struct { PartitionOffsets map[int32]int64 }
type DeleteRecordsResponsePartition struct { LowWatermark int64 Err KError }
DescribeAclsRequest is a describe acl request type
DescribeAclsResponse is a describe acl response type
A filter to be applied to matching client quotas. Components: the components to filter on Strict: whether the filter only includes specified components
type DescribeConfigError struct { Err KError ErrMsg string }
type DescribeGroupsRequest struct { Version int16 Groups []string }
DescribeLogDirsRequest is a describe request to get partitions' log size
type DescribeLogDirsRequestTopic struct { Topic string PartitionIDs []int32 }
DescribeLogDirsRequestTopic is a describe request about the log dir of one or more partitions within a Topic
type DescribeLogDirsResponsePartition struct { PartitionID int32 Size int64 OffsetLag int64 IsTemporary bool }
DescribeLogDirsResponsePartition describes a partition's log directory
DescribeLogDirsResponseTopic contains a topic's partitions descriptions
DescribeUserScramCredentialsRequest is a request to get list of SCRAM user names
type DescribeUserScramCredentialsRequestUser struct { Name string }
DescribeUserScramCredentialsRequestUser is a describe request about specific user name
DynamicConsistencyPartitioner can optionally be implemented by Partitioners in order to allow more flexibility than is originally allowed by the RequiresConsistency method in the Partitioner interface. This allows partitioners to require consistency sometimes, but not all times. It's useful for, e.g., the HashPartitioner, which does not require consistency if the message key is nil.
type Encoder interface { Encode() ([]byte, error) Length() int }
Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().
type EndTxnRequest struct { Version int16 TransactionalID string ProducerID int64 ProducerEpoch int16 TransactionResult bool }
AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
FilterResponse is a filter response type
This does the handshake for authorization
type GroupData struct { GroupState string }
GroupDescription contains each described group.
type GroupMember struct { MemberId string GroupInstanceId *string Metadata []byte }
GroupMemberDescription contains the group members.
type GroupProtocol struct { Name string Metadata []byte }
type HashPartitionerOption func(*hashPartitioner)
HashPartitionerOption lets you modify default values of the partitioner
WithAbsFirst means that the partitioner handles absolute values in the same way as the reference Java implementation
WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty
WithCustomHashFunction lets you specify what hash function to use for the partitioning
WithHashUnsigned means the partitioner treats the hashed value as unsigned when partitioning. This is intended to be combined with the crc32 hash algorithm to be compatible with librdkafka's implementation
type IncrementalAlterConfigsOperation int8
const ( IncrementalAlterConfigsOperationSet IncrementalAlterConfigsOperation = iota IncrementalAlterConfigsOperationDelete IncrementalAlterConfigsOperationAppend IncrementalAlterConfigsOperationSubtract )
IncrementalAlterConfigsRequest is an incremental alter config request type
IncrementalAlterConfigsResponse is a response type for incremental alter config
KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
const ( ErrUnknown KError = -1 ErrNoError KError = 0 ErrOffsetOutOfRange KError = 1 ErrInvalidMessage KError = 2 ErrUnknownTopicOrPartition KError = 3 ErrInvalidMessageSize KError = 4 ErrLeaderNotAvailable KError = 5 ErrNotLeaderForPartition KError = 6 ErrRequestTimedOut KError = 7 ErrBrokerNotAvailable KError = 8 ErrReplicaNotAvailable KError = 9 ErrMessageSizeTooLarge KError = 10 ErrStaleControllerEpochCode KError = 11 ErrOffsetMetadataTooLarge KError = 12 ErrNetworkException KError = 13 ErrOffsetsLoadInProgress KError = 14 ErrConsumerCoordinatorNotAvailable KError = 15 ErrNotCoordinatorForConsumer KError = 16 ErrInvalidTopic KError = 17 ErrMessageSetSizeTooLarge KError = 18 ErrNotEnoughReplicas KError = 19 ErrNotEnoughReplicasAfterAppend KError = 20 ErrInvalidRequiredAcks KError = 21 ErrIllegalGeneration KError = 22 ErrInconsistentGroupProtocol KError = 23 ErrInvalidGroupId KError = 24 ErrUnknownMemberId KError = 25 ErrInvalidSessionTimeout KError = 26 ErrRebalanceInProgress KError = 27 ErrInvalidCommitOffsetSize KError = 28 ErrTopicAuthorizationFailed KError = 29 ErrGroupAuthorizationFailed KError = 30 ErrClusterAuthorizationFailed KError = 31 ErrInvalidTimestamp KError = 32 ErrUnsupportedSASLMechanism KError = 33 ErrIllegalSASLState KError = 34 ErrUnsupportedVersion KError = 35 ErrTopicAlreadyExists KError = 36 ErrInvalidPartitions KError = 37 ErrInvalidReplicationFactor KError = 38 ErrInvalidReplicaAssignment KError = 39 ErrInvalidConfig KError = 40 ErrNotController KError = 41 ErrInvalidRequest KError = 42 ErrUnsupportedForMessageFormat KError = 43 ErrPolicyViolation KError = 44 ErrOutOfOrderSequenceNumber KError = 45 ErrDuplicateSequenceNumber KError = 46 ErrInvalidProducerEpoch KError = 47 ErrInvalidTxnState KError = 48 ErrInvalidProducerIDMapping KError = 49 ErrInvalidTransactionTimeout KError = 50 ErrConcurrentTransactions KError = 51 ErrTransactionCoordinatorFenced KError = 52 ErrTransactionalIDAuthorizationFailed KError = 53 ErrSecurityDisabled KError = 54 ErrOperationNotAttempted KError = 55 ErrKafkaStorageError KError = 56 ErrLogDirNotFound KError = 57 ErrSASLAuthenticationFailed KError = 58 ErrUnknownProducerID KError = 59 ErrReassignmentInProgress KError = 60 ErrDelegationTokenAuthDisabled KError = 61 ErrDelegationTokenNotFound KError = 62 ErrDelegationTokenOwnerMismatch KError = 63 ErrDelegationTokenRequestNotAllowed KError = 64 ErrDelegationTokenAuthorizationFailed KError = 65 ErrDelegationTokenExpired KError = 66 ErrInvalidPrincipalType KError = 67 ErrNonEmptyGroup KError = 68 ErrGroupIDNotFound KError = 69 ErrFetchSessionIDNotFound KError = 70 ErrInvalidFetchSessionEpoch KError = 71 ErrListenerNotFound KError = 72 ErrTopicDeletionDisabled KError = 73 ErrFencedLeaderEpoch KError = 74 ErrUnknownLeaderEpoch KError = 75 ErrUnsupportedCompressionType KError = 76 ErrStaleBrokerEpoch KError = 77 ErrOffsetNotAvailable KError = 78 ErrMemberIdRequired KError = 79 ErrPreferredLeaderNotAvailable KError = 80 ErrGroupMaxSizeReached KError = 81 ErrFencedInstancedId KError = 82 ErrEligibleLeadersNotAvailable KError = 83 ErrElectionNotNeeded KError = 84 ErrNoReassignmentInProgress KError = 85 ErrGroupSubscribedToTopic KError = 86 ErrInvalidRecord KError = 87 ErrUnstableOffsetCommit KError = 88 ErrThrottlingQuotaExceeded KError = 89 ErrProducerFenced KError = 90 )
Numeric error codes returned by the Kafka server.
type KafkaVersion struct { }
KafkaVersion instances represent versions of the upstream Kafka broker.
ParseKafkaVersion parses and returns kafka version or error from a string
IsAtLeast return true if and only if the version it is called on is greater than or equal to the version passed in:
V1.IsAtLeast(V2) // false V2.IsAtLeast(V1) // true
NewKerberosClient creates kerberos client used to obtain TGT and TGS tokens. It uses pure go Kerberos 5 solution (RFC-4121 and RFC-4120). uses gokrb5 library underlying which is a pure go kerberos client with some GSS-API capabilities.
func (*KerberosGoKrb5Client) Domain ¶ added in v1.40.0type ListGroupsRequest struct { Version int16 StatesFilter []string }
type ListPartitionReassignmentsRequest struct { TimeoutMs int32 Version int16 }
MatchingAcl is a matching acl type
type MemberIdentity struct { MemberId string GroupInstanceId *string }
Message is a kafka message type
Messages convenience helper which returns either all the messages that are wrapped in this block
type MessageSet struct { PartialTrailingMessage bool OverflowMessage bool Messages []*MessageBlock }
type MetadataRequest struct { Version int16 Topics []string AllowAutoTopicCreation bool IncludeClusterAuthorizedOperations bool IncludeTopicAuthorizedOperations bool }
type MockAlterConfigsResponse struct { }
type MockAlterConfigsResponseWithErrorCode struct { }
type MockAlterPartitionReassignmentsResponse struct { }
type MockApiVersionsResponse struct { }
type MockBroker struct { }
MockBroker is a mock Kafka broker that is used in unit tests. It is exposed to facilitate testing of higher level or specialized consumers and producers built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol, but rather provides a facility to do that. It takes care of the TCP transport, request unmarshalling, response marshaling, and makes it the test writer responsibility to program correct according to the Kafka API protocol MockBroker behavior.
MockBroker is implemented as a TCP server listening on a kernel-selected localhost port that can accept many connections. It reads Kafka requests from that connection and returns responses programmed by the SetHandlerByMap function. If a MockBroker receives a request that it has no programmed response for, then it returns nothing and the request times out.
A set of MockRequest builders to define mappings used by MockBroker is provided by Sarama. But users can develop MockRequests of their own and use them along with or instead of the standard ones.
When running tests with MockBroker it is strongly recommended to specify a timeout to `go test` so that if the broker hangs waiting for a response, the test panics.
It is not necessary to prefix message length or correlation ID to your response bytes, the server does that automatically as a convenience.
NewMockBroker launches a fake Kafka broker. It takes a TestReporter as provided by the test framework and a channel of responses to use. If an error occurs it is simply logged to the TestReporter and the broker exits.
NewMockBrokerAddr behaves like newMockBroker but listens on the address you give it rather than just some ephemeral port.
NewMockBrokerListener behaves like newMockBrokerAddr but accepts connections on the listener specified.
Addr returns the broker connection string in the form "<address>:<port>".
BrokerID returns broker ID assigned to the broker.
Close terminates the broker blocking until it stops internal goroutines and releases all resources.
History returns a slice of RequestResponse pairs in the order they were processed by the broker. Note that in case of multiple connections to the broker the order expected by a test can be different from the order recorded in the history, unless some synchronization is implemented in the test.
Port returns the TCP port number the broker is listening for requests on.
func (*MockBroker) SetHandlerByMap ¶ added in v1.9.0SetHandlerByMap defines mapping of Request types to MockResponses. When a request is received by the broker, it looks up the request type in the map and uses the found MockResponse instance to generate an appropriate reply. If the request type is not found in the map then nothing is sent.
func (*MockBroker) SetHandlerFuncByMap ¶ added in v1.41.2func (b *MockBroker) SetHandlerFuncByMap(handlerMap map[string]requestHandlerFunc)
SetHandlerFuncByMap defines mapping of Request types to RequestHandlerFunc. When a request is received by the broker, it looks up the request type in the map and invoke the found RequestHandlerFunc instance to generate an appropriate reply.
SetLatency makes broker pause for the specified period every time before replying.
SetNotifier set a function that will get invoked whenever a request has been processed successfully and will provide the number of bytes read and written
type MockConsumerMetadataResponse struct { }
MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
type MockCreateAclsResponse struct { }
type MockCreateAclsResponseError struct { }
type MockCreatePartitionsResponse struct { }
type MockCreateTopicsResponse struct { }
type MockDeleteAclsResponse struct { }
type MockDeleteGroupsResponse struct { }
type MockDeleteOffsetResponse struct { }
type MockDeleteRecordsResponse struct { }
type MockDeleteTopicsResponse struct { }
type MockDescribeConfigsResponse struct { }
type MockDescribeConfigsResponseWithErrorCode struct { }
type MockDescribeGroupsResponse struct { }
type MockDescribeLogDirsResponse struct { }
type MockElectLeadersResponse struct { }
type MockFetchResponse struct { }
MockFetchResponse is a `FetchResponse` builder.
type MockFindCoordinatorResponse struct { }
MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
type MockHeartbeatResponse struct { Err KError }
type MockIncrementalAlterConfigsResponse struct { }
type MockIncrementalAlterConfigsResponseWithErrorCode struct { }
type MockInitProducerIDResponse struct { }
MockInitProducerIDResponse is an `InitPorducerIDResponse` builder.
func (*MockKerberosClient) Domain ¶ added in v1.40.0type MockLeaveGroupResponse struct { Err KError }
type MockListAclsResponse struct { }
type MockListGroupsResponse struct { }
type MockListPartitionReassignmentsResponse struct { }
type MockMetadataResponse struct { }
MockMetadataResponse is a `MetadataResponse` builder.
type MockOffsetCommitResponse struct { }
MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
type MockOffsetFetchResponse struct { }
MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
type MockOffsetResponse struct { }
MockOffsetResponse is an `OffsetResponse` builder.
type MockProduceResponse struct { }
MockProduceResponse is a `ProduceResponse` builder.
type MockResponse interface { For(reqBody versionedDecoder) (res encoderWithHeader) }
MockResponse is a response builder interface it defines one method that allows generating a response based on a request body. MockResponses are used to program behavior of MockBroker in tests.
type MockSaslAuthenticateResponse struct { }type MockSaslHandshakeResponse ¶ added in v1.40.0
type MockSaslHandshakeResponse struct { }func (*MockSaslHandshakeResponse) For ¶ added in v1.40.0 func (*MockSaslHandshakeResponse) SetError ¶ added in v1.40.0
type MockSequence struct { }
MockSequence is a mock response builder that is created from a sequence of concrete responses. Every time when a `MockBroker` calls its `For` method the next response from the sequence is returned. When the end of the sequence is reached the last element from the sequence is returned.
func NewMockSequence(responses ...interface{}) *MockSequence
func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader)
type MockSyncGroupResponse struct { Err KError MemberAssignment []byte }
type MockWrapper struct { }
MockWrapper is a mock response builder that returns a particular concrete response regardless of the actual request passed to the `For` method.
func NewMockWrapper(res encoderWithHeader) *MockWrapper
func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader)
type OffsetFetchRequest struct { Version int16 ConsumerGroup string RequireStable bool }
OffsetManager uses Kafka to store and fetch consumed partition offsets.
NewOffsetManagerFromClient creates a new OffsetManager from the given client. It is still necessary to call Close() on the underlying client when finished with the partition manager.
type OwnedPartition struct { Topic string Partitions []int32 }
type PacketDecodingError struct { Info string }
PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.
type PacketEncodingError struct { Info string }
PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out of scope.
The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported as out of range by the brokers. In this case you should decide what you want to do (try a different offset, notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying. By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will also drain the Messages channel, harvest all errors & return them once cleanup has completed.
type PartitionError struct { Partition int32 Err KError }
PartitionError is a partition error type
PartitionMetadata contains each partition in the topic.
PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close() on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.
type PartitionReplicaReassignmentsStatus struct { Replicas []int32 AddingReplicas []int32 RemovingReplicas []int32 }
type PartitionResult struct { ErrorCode KError ErrorMessage *string }
Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1], decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided as simple default implementations.
This example shows how to assign partitions to your messages manually.
config := NewTestConfig() // First, we tell the producer that we are going to partition ourselves. config.Producer.Partitioner = NewManualPartitioner producer, err := NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { log.Println(err) return } defer func() { if err := producer.Close(); err != nil { log.Println("Failed to close producer:", err) } }() // Now, we set the Partition field of the ProducerMessage struct. msg := &ProducerMessage{Topic: "test", Partition: 6, Value: StringEncoder("test")} partition, offset, err := producer.SendMessage(msg) if err != nil { log.Println("Failed to produce message to kafka cluster.") return } if partition != 6 { log.Println("Message should have been produced to partition 6!") return } log.Printf("Produced message to partition %d with offset %d", partition, offset)
This example shows how to set a different partitioner depending on the topic.
config := NewTestConfig() config.Producer.Partitioner = func(topic string) Partitioner { switch topic { case "access_log", "error_log": return NewRandomPartitioner(topic) default: return NewHashPartitioner(topic) } } // ...
By default, Sarama uses the message's key to consistently assign a partition to a message using hashing. If no key is set, a random partition will be chosen. This example shows how you can partition messages randomly, even when a key is set, by overriding Config.Producer.Partitioner.
config := NewTestConfig() config.Producer.Partitioner = NewRandomPartitioner producer, err := NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { log.Println(err) return } defer func() { if err := producer.Close(); err != nil { log.Println("Failed to close producer:", err) } }() msg := &ProducerMessage{Topic: "test", Key: StringEncoder("key is set"), Value: StringEncoder("test")} partition, offset, err := producer.SendMessage(msg) if err != nil { log.Println("Failed to produce message to kafka cluster.") return } log.Printf("Produced message to partition %d with offset %d", partition, offset)
NewConsistentCRCHashPartitioner is like NewHashPartitioner execpt that it uses the *unsigned* crc32 hash of the encoded bytes of the message key modulus the number of partitions. This is compatible with librdkafka's `consistent_random` partitioner
NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used, modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.
NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided ProducerMessage's Partition field as the partition to produce to.
func NewRandomPartitioner ¶NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values in the same way as the reference Java implementation. NewHashPartitioner was supposed to do that but it had a mistake and now there are people depending on both behaviors. This will all go away on the next major version bump.
NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.
PartitionerConstructor is the type for a function capable of constructing new Partitioners.
NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher. The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.
NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options
ProduceCallback function is called once the produce response has been parsed or could not be read.
partition_responses in protocol
ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.
ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface. It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel when closing a producer.
ProducerMessage is the collection of elements passed to the Producer in order to send a message.
type ProducerTxnStatusFlag int16
ProducerTxnStatusFlag mark current transaction status.
const ( ProducerTxnFlagUninitialized ProducerTxnStatusFlag = 1 << iota ProducerTxnFlagInitializing ProducerTxnFlagReady ProducerTxnFlagInTransaction ProducerTxnFlagEndTransaction ProducerTxnFlagInError ProducerTxnFlagCommittingTransaction ProducerTxnFlagAbortingTransaction ProducerTxnFlagAbortableError ProducerTxnFlagFatalError )
Describe a component for applying a client quota filter. EntityType: the entity type the filter component applies to ("user", "client-id", "ip") MatchType: the match type of the filter component (any, exact, default) Match: the name that's matched exactly (used when MatchType is QuotaMatchExact)
Record is kafka record type
type RecordHeader struct { }
RecordHeader stores key and value for a record header
Records implements a union type containing either a RecordBatch or a legacy MessageSet.
type RequestNotifierFunc func(bytesRead, bytesWritten int)
RequestNotifierFunc is invoked when a mock broker processes a request successfully and will provides the number of bytes read and written.
type RequestResponse struct { Request protocolBody Response encoder }
RequestResponse represents a Request/Response pair processed by MockBroker.
RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).
Resource holds information about acl resource type
ResourceAcls is an acl resource type
SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
SCRAMClient is a an interface to a SCRAM client implementation.
type SaslAuthenticateRequest struct { Version int16 SaslAuthBytes []byte }
type SaslAuthenticateResponse struct { Version int16 Err KError ErrorMessage *string SaslAuthBytes []byte SessionLifetimeMs int64 }
type ScramMechanismType int8
type StdLogger interface { Print(v ...interface{}) Printf(format string, v ...interface{}) Println(v ...interface{}) }
StdLogger is used to log error messages.
DebugLogger is the instance of a StdLogger that Sarama writes more verbose debug information to. By default it is set to redirect all debug to the default Logger above, but you can optionally set it to another StdLogger instance to (e.g.,) discard debug information
type StickyAssignorUserData interface { }
type StickyAssignorUserDataV0 struct { Topics map[string][]int32 }
StickyAssignorUserDataV0 holds topic partition information for an assignment
type StickyAssignorUserDataV1 struct { Topics map[string][]int32 Generation int32 }
StickyAssignorUserDataV1 holds topic partition information for an assignment
StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.
type SyncGroupRequestAssignment struct { MemberId string Assignment []byte }
type SyncGroupResponse struct { Version int16 ThrottleTime int32 Err KError MemberAssignment []byte }
SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to be set to true in its configuration.
This example shows the basic usage pattern of the SyncProducer.
producer, err := NewSyncProducer([]string{"localhost:9092"}, nil) if err != nil { log.Fatalln(err) } defer func() { if err := producer.Close(); err != nil { log.Fatalln(err) } }() msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")} partition, offset, err := producer.SendMessage(msg) if err != nil { log.Printf("FAILED to send message: %s\n", err) } else { log.Printf("> message sent to partition %d at offset %d\n", partition, offset) }
NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still necessary to call Close() on the underlying client when shutting down this producer.
type TestReporter interface { Error(...interface{}) Errorf(string, ...interface{}) Fatal(...interface{}) Fatalf(string, ...interface{}) Helper() }
TestReporter has methods matching go's testing.T to avoid importing `testing` in the main part of the library.
TopicMetadata contains each topic in the response.
type TopicPartition struct { Count int32 Assignment [][]int32 }
type TopicPartitionError struct { Err KError ErrMsg *string }
type ZstdDecoderParams struct { }
type ZstdEncoderParams struct { Level int }
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4