A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://pkg.go.dev/github.com/IBM/sarama below:

sarama package - github.com/IBM/sarama - Go Packages

Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later). It includes a high-level API for easily producing and consuming messages, and a low-level API for controlling bytes on the wire when the high-level API is insufficient. Usage examples for the high-level APIs are provided inline with their full documentation.

To produce messages, use either the AsyncProducer or the SyncProducer. The AsyncProducer accepts messages on a channel and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases. The SyncProducer provides a method which will block until Kafka acknowledges the message as produced. This can be useful but comes with two caveats: it will generally be less efficient, and the actual durability guarantees depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.

To consume messages, use Consumer or Consumer-Group API.

For lower-level needs, the Broker and Request/Response objects permit precise control over each connection and message sent on the wire; the Client provides higher-level metadata management that is shared between the producers and the consumer. The Request/Response objects and properties are mostly undocumented, as they line up exactly with the protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

Metrics are exposed through https://github.com/rcrowley/go-metrics library in a local registry.

Broker related metrics:

+---------------------------------------------------------+------------+---------------------------------------------------------------+
| Name                                                    | Type       | Description                                                   |
+---------------------------------------------------------+------------+---------------------------------------------------------------+
| incoming-byte-rate                                      | meter      | Bytes/second read off all brokers                             |
| incoming-byte-rate-for-broker-<broker-id>               | meter      | Bytes/second read off a given broker                          |
| outgoing-byte-rate                                      | meter      | Bytes/second written off all brokers                          |
| outgoing-byte-rate-for-broker-<broker-id>               | meter      | Bytes/second written off a given broker                       |
| request-rate                                            | meter      | Requests/second sent to all brokers                           |
| request-rate-for-broker-<broker-id>                     | meter      | Requests/second sent to a given broker                        |
| request-size                                            | histogram  | Distribution of the request size in bytes for all brokers     |
| request-size-for-broker-<broker-id>                     | histogram  | Distribution of the request size in bytes for a given broker  |
| request-latency-in-ms                                   | histogram  | Distribution of the request latency in ms for all brokers     |
| request-latency-in-ms-for-broker-<broker-id>            | histogram  | Distribution of the request latency in ms for a given broker  |
| response-rate                                           | meter      | Responses/second received from all brokers                    |
| response-rate-for-broker-<broker-id>                    | meter      | Responses/second received from a given broker                 |
| response-size                                           | histogram  | Distribution of the response size in bytes for all brokers    |
| response-size-for-broker-<broker-id>                    | histogram  | Distribution of the response size in bytes for a given broker |
| requests-in-flight                                      | counter    | The current number of in-flight requests awaiting a response  |
|                                                         |            | for all brokers                                               |
| requests-in-flight-for-broker-<broker-id>               | counter    | The current number of in-flight requests awaiting a response  |
|                                                         |            | for a given broker                                            |
| protocol-requests-rate-<api-key>          	          | meter      | Number of api requests sent to the brokers for all brokers    |
|                                                         |            | https://kafka.apache.org/protocol.html#protocol_api_keys      |                                        |
| protocol-requests-rate-<api-key>-for-broker-<broker-id> | meter      | Number of packets sent to the brokers by api-key for a given  |
|                                                         |            | broker                                                        |
+---------------------------------------------------------+------------+---------------------------------------------------------------+

Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.

Producer related metrics:

+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
| Name                                      | Type       | Description                                                                          |
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
| batch-size                                | histogram  | Distribution of the number of bytes sent per partition per request for all topics    |
| batch-size-for-topic-<topic>              | histogram  | Distribution of the number of bytes sent per partition per request for a given topic |
| record-send-rate                          | meter      | Records/second sent to all topics                                                    |
| record-send-rate-for-topic-<topic>        | meter      | Records/second sent to a given topic                                                 |
| records-per-request                       | histogram  | Distribution of the number of records sent per request for all topics                |
| records-per-request-for-topic-<topic>     | histogram  | Distribution of the number of records sent per request for a given topic             |
| compression-ratio                         | histogram  | Distribution of the compression ratio times 100 of record batches for all topics     |
| compression-ratio-for-topic-<topic>       | histogram  | Distribution of the compression ratio times 100 of record batches for a given topic  |
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+

Consumer related metrics:

+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
| Name                                      | Type       | Description                                                                          |
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
| consumer-batch-size                       | histogram  | Distribution of the number of messages in a batch                                    |
| consumer-fetch-rate                       | meter      | Fetch requests/second sent to all brokers                                            |
| consumer-fetch-rate-for-broker-<broker>   | meter      | Fetch requests/second sent to a given broker                                         |
| consumer-fetch-rate-for-topic-<topic>     | meter      | Fetch requests/second sent for a given topic                                         |
| consumer-fetch-response-size              | histogram  | Distribution of the fetch response size in bytes                                     |
| consumer-group-join-total-<GroupID>       | counter    | Total count of consumer group join attempts                                          |
| consumer-group-join-failed-<GroupID>      | counter    | Total count of consumer group join failures                                          |
| consumer-group-sync-total-<GroupID>       | counter    | Total count of consumer group sync attempts                                          |
| consumer-group-sync-failed-<GroupID>      | counter    | Total count of consumer group sync failures                                          |
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
View Source
const (
	
	RangeBalanceStrategyName = "range"

	
	RoundRobinBalanceStrategyName = "roundrobin"

	
	StickyBalanceStrategyName = "sticky"
)
View Source
const (
	
	SASLTypeOAuth = "OAUTHBEARER"
	
	SASLTypePlaintext = "PLAIN"
	
	SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
	
	SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
	SASLTypeGSSAPI      = "GSSAPI"
	
	
	SASLHandshakeV0 = int16(0)
	
	
	SASLHandshakeV1 = int16(1)
	
	
	SASLExtKeyAuth = "auth"
)
View Source
const (
	TOK_ID_KRB_AP_REQ   = 256
	GSS_API_GENERIC_TAG = 0x60
	KRB5_USER_AUTH      = 1
	KRB5_KEYTAB_AUTH    = 2
	KRB5_CCACHE_AUTH    = 3
	GSS_API_INITIAL     = 1
	GSS_API_VERIFY      = 2
	GSS_API_FINISH      = 3
)

APIKeySASLAuth is the API key for the SaslAuthenticate Kafka API

GroupGenerationUndefined is a special value for the group generation field of Offset Commit Requests that should be used when a consumer group does not rely on Kafka for partition management.

ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received. The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.

View Source
var (
	V0_8_2_0  = newKafkaVersion(0, 8, 2, 0)
	V0_8_2_1  = newKafkaVersion(0, 8, 2, 1)
	V0_8_2_2  = newKafkaVersion(0, 8, 2, 2)
	V0_9_0_0  = newKafkaVersion(0, 9, 0, 0)
	V0_9_0_1  = newKafkaVersion(0, 9, 0, 1)
	V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
	V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
	V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
	V0_10_1_1 = newKafkaVersion(0, 10, 1, 1)
	V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
	V0_10_2_1 = newKafkaVersion(0, 10, 2, 1)
	V0_10_2_2 = newKafkaVersion(0, 10, 2, 2)
	V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
	V0_11_0_1 = newKafkaVersion(0, 11, 0, 1)
	V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
	V1_0_0_0  = newKafkaVersion(1, 0, 0, 0)
	V1_0_1_0  = newKafkaVersion(1, 0, 1, 0)
	V1_0_2_0  = newKafkaVersion(1, 0, 2, 0)
	V1_1_0_0  = newKafkaVersion(1, 1, 0, 0)
	V1_1_1_0  = newKafkaVersion(1, 1, 1, 0)
	V2_0_0_0  = newKafkaVersion(2, 0, 0, 0)
	V2_0_1_0  = newKafkaVersion(2, 0, 1, 0)
	V2_1_0_0  = newKafkaVersion(2, 1, 0, 0)
	V2_1_1_0  = newKafkaVersion(2, 1, 1, 0)
	V2_2_0_0  = newKafkaVersion(2, 2, 0, 0)
	V2_2_1_0  = newKafkaVersion(2, 2, 1, 0)
	V2_2_2_0  = newKafkaVersion(2, 2, 2, 0)
	V2_3_0_0  = newKafkaVersion(2, 3, 0, 0)
	V2_3_1_0  = newKafkaVersion(2, 3, 1, 0)
	V2_4_0_0  = newKafkaVersion(2, 4, 0, 0)
	V2_4_1_0  = newKafkaVersion(2, 4, 1, 0)
	V2_5_0_0  = newKafkaVersion(2, 5, 0, 0)
	V2_5_1_0  = newKafkaVersion(2, 5, 1, 0)
	V2_6_0_0  = newKafkaVersion(2, 6, 0, 0)
	V2_6_1_0  = newKafkaVersion(2, 6, 1, 0)
	V2_6_2_0  = newKafkaVersion(2, 6, 2, 0)
	V2_6_3_0  = newKafkaVersion(2, 6, 3, 0)
	V2_7_0_0  = newKafkaVersion(2, 7, 0, 0)
	V2_7_1_0  = newKafkaVersion(2, 7, 1, 0)
	V2_7_2_0  = newKafkaVersion(2, 7, 2, 0)
	V2_8_0_0  = newKafkaVersion(2, 8, 0, 0)
	V2_8_1_0  = newKafkaVersion(2, 8, 1, 0)
	V2_8_2_0  = newKafkaVersion(2, 8, 2, 0)
	V3_0_0_0  = newKafkaVersion(3, 0, 0, 0)
	V3_0_1_0  = newKafkaVersion(3, 0, 1, 0)
	V3_0_2_0  = newKafkaVersion(3, 0, 2, 0)
	V3_1_0_0  = newKafkaVersion(3, 1, 0, 0)
	V3_1_1_0  = newKafkaVersion(3, 1, 1, 0)
	V3_1_2_0  = newKafkaVersion(3, 1, 2, 0)
	V3_2_0_0  = newKafkaVersion(3, 2, 0, 0)
	V3_2_1_0  = newKafkaVersion(3, 2, 1, 0)
	V3_2_2_0  = newKafkaVersion(3, 2, 2, 0)
	V3_2_3_0  = newKafkaVersion(3, 2, 3, 0)
	V3_3_0_0  = newKafkaVersion(3, 3, 0, 0)
	V3_3_1_0  = newKafkaVersion(3, 3, 1, 0)
	V3_3_2_0  = newKafkaVersion(3, 3, 2, 0)
	V3_4_0_0  = newKafkaVersion(3, 4, 0, 0)
	V3_4_1_0  = newKafkaVersion(3, 4, 1, 0)
	V3_5_0_0  = newKafkaVersion(3, 5, 0, 0)
	V3_5_1_0  = newKafkaVersion(3, 5, 1, 0)
	V3_5_2_0  = newKafkaVersion(3, 5, 2, 0)
	V3_6_0_0  = newKafkaVersion(3, 6, 0, 0)
	V3_6_1_0  = newKafkaVersion(3, 6, 1, 0)
	V3_6_2_0  = newKafkaVersion(3, 6, 2, 0)
	V3_7_0_0  = newKafkaVersion(3, 7, 0, 0)
	V3_7_1_0  = newKafkaVersion(3, 7, 1, 0)
	V3_7_2_0  = newKafkaVersion(3, 7, 2, 0)
	V3_8_0_0  = newKafkaVersion(3, 8, 0, 0)
	V3_8_1_0  = newKafkaVersion(3, 8, 1, 0)
	V3_9_0_0  = newKafkaVersion(3, 9, 0, 0)
	V4_0_0_0  = newKafkaVersion(4, 0, 0, 0)

	SupportedVersions = []KafkaVersion{
		V0_8_2_0,
		V0_8_2_1,
		V0_8_2_2,
		V0_9_0_0,
		V0_9_0_1,
		V0_10_0_0,
		V0_10_0_1,
		V0_10_1_0,
		V0_10_1_1,
		V0_10_2_0,
		V0_10_2_1,
		V0_10_2_2,
		V0_11_0_0,
		V0_11_0_1,
		V0_11_0_2,
		V1_0_0_0,
		V1_0_1_0,
		V1_0_2_0,
		V1_1_0_0,
		V1_1_1_0,
		V2_0_0_0,
		V2_0_1_0,
		V2_1_0_0,
		V2_1_1_0,
		V2_2_0_0,
		V2_2_1_0,
		V2_2_2_0,
		V2_3_0_0,
		V2_3_1_0,
		V2_4_0_0,
		V2_4_1_0,
		V2_5_0_0,
		V2_5_1_0,
		V2_6_0_0,
		V2_6_1_0,
		V2_6_2_0,
		V2_6_3_0,
		V2_7_0_0,
		V2_7_1_0,
		V2_7_2_0,
		V2_8_0_0,
		V2_8_1_0,
		V2_8_2_0,
		V3_0_0_0,
		V3_0_1_0,
		V3_0_2_0,
		V3_1_0_0,
		V3_1_1_0,
		V3_1_2_0,
		V3_2_0_0,
		V3_2_1_0,
		V3_2_2_0,
		V3_2_3_0,
		V3_3_0_0,
		V3_3_1_0,
		V3_3_2_0,
		V3_4_0_0,
		V3_4_1_0,
		V3_5_0_0,
		V3_5_1_0,
		V3_5_2_0,
		V3_6_0_0,
		V3_6_1_0,
		V3_6_2_0,
		V3_7_0_0,
		V3_7_1_0,
		V3_7_2_0,
		V3_8_0_0,
		V3_8_1_0,
		V3_9_0_0,
		V4_0_0_0,
	}
	MinVersion     = V0_8_2_0
	MaxVersion     = V4_0_0_0
	DefaultVersion = V2_1_0_0
)

Effective constants defining the supported kafka versions.

Deprecated: use NewBalanceStrategyRange to avoid data race issue

Deprecated: use NewBalanceStrategyRoundRobin to avoid data race issue

Deprecated: use NewBalanceStrategySticky to avoid data race issue

View Source
var ErrAddPartitionsToTxn = errors.New("transaction manager: failed to send partitions to transaction")

ErrAddPartitionsToTxn is returned when AddPartitionsToTxn failed multiple times

ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.

ErrBrokerNotFound is the error returned when there's no broker found for the requested ID.

View Source
var ErrCannotTransitionNilError = errors.New("transaction manager: cannot transition with a nil error")

ErrCannotTransitionNilError when transition is attempted with an nil error.

ErrClosedClient is the error returned when a method is called on a client that has been closed.

View Source
var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")

ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.

View Source
var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")

ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing a RecordBatch.

ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version is lower than 0.10.0.0.

ErrCreateACLs is the type of error returned when ACL creation failed

ErrDeleteRecords is the type of error returned when fail to delete the required records

View Source
var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")

ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.

View Source
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")

ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.

View Source
var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")

ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).

ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max

View Source
var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")

ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update the metadata.

View Source
var ErrNonTransactedProducer = errors.New("transaction manager: you need to add TransactionalID to producer")

ErrNonTransactedProducer when calling BeginTxn, CommitTxn or AbortTxn on a non transactional producer.

ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.

View Source
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to")

ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.

View Source
var ErrProducerRetryBufferOverflow = errors.New("retry buffer full: message discarded to prevent buffer overflow")

ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied.

ErrReassignPartitions is returned when altering partition assignments for a topic fails

View Source
var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")

ErrShuttingDown is returned when a producer receives a message during shutdown.

View Source
var ErrTransactionNotReady = errors.New("transaction manager: transaction is not ready")

ErrTransactionNotReady when transaction status is invalid for the current action.

View Source
var ErrTransitionNotAllowed = errors.New("transaction manager: invalid transition attempted")

ErrTransitionNotAllowed when txnmgr state transition is not valid.

View Source
var ErrTxnOffsetCommit = errors.New("transaction manager: failed to send offsets to transaction")

ErrTxnOffsetCommit is returned when TxnOffsetCommit failed multiple times

View Source
var ErrTxnUnableToParseResponse = errors.New("transaction manager: unable to parse response")

ErrTxnUnableToParseResponse when response is nil

ErrUnknownScramMechanism is returned when user tries to AlterUserScramCredentials with unknown SCRAM mechanism

MultiErrorFormat specifies the formatter applied to format multierrors. The default implementation is a condensed version of the hashicorp/go-multierror default one

View Source
var NullUUID = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}

NewExponentialBackoff returns a function that implements an exponential backoff strategy with jitter. It follows KIP-580, implementing the formula: MIN(retry.backoff.max.ms, (retry.backoff.ms * 2**(failures - 1)) * random(0.8, 1.2)) This ensures retries start with `backoff` and exponentially increase until `maxBackoff`, with added jitter. The behavior when `failures = 0` is not explicitly defined in KIP-580 and is left to implementation discretion.

Example usage:

backoffFunc := sarama.NewExponentialBackoff(config.Producer.Retry.Backoff, 2*time.Second)
config.Producer.Retry.BackoffFunc = backoffFunc
func Wrap(sentinel error, wrapped ...error) sentinelError
type AbortedTransaction struct {
	
	ProducerID int64
	
	FirstOffset int64
}

AccessToken contains an access token used to authenticate a SASL/OAUTHBEARER client along with associated metadata.

AccessTokenProvider is the interface that encapsulates how implementors can generate access tokens for Kafka broker authentication.

Acl holds information about acl type

AclCreation is a wrapper around Resource and Acl type

type AclCreationResponse struct {
	Err    KError
	ErrMsg *string
}

AclCreationResponse is an acl creation response type

MarshalText returns the text form of the AclOperation (name without prefix)

UnmarshalText takes a text representation of the operation and converts it to an AclOperation

type AclPermissionType int

MarshalText returns the text form of the AclPermissionType (name without prefix)

UnmarshalText takes a text representation of the permission type and converts it to an AclPermissionType

type AclResourcePatternType int

MarshalText returns the text form of the AclResourcePatternType (name without prefix)

UnmarshalText takes a text representation of the resource pattern type and converts it to an AclResourcePatternType

MarshalText returns the text form of the AclResourceType (name without prefix)

UnmarshalText takes a text representation of the resource type and converts it to an AclResourceType

AddOffsetsToTxnRequest adds offsets to a transaction request

AddOffsetsToTxnResponse is a response type for adding offsets to txns

AddPartitionsToTxnRequest is a add partition request

AddPartitionsToTxnResponse is a partition errors to transaction type

AlterConfigsRequest is an alter config request type

AlterConfigsResource is an alter config resource type

AlterConfigsResourceResponse is a response type for alter config resource

AlterConfigsResponse is a response type for alter config

type AlterPartitionReassignmentsRequest struct {
	TimeoutMs int32

	Version int16
	
}
type AlterPartitionReassignmentsResponse struct {
	Version        int16
	ThrottleTimeMs int32
	ErrorCode      KError
	ErrorMessage   *string
	Errors         map[string]map[int32]*alterPartitionReassignmentsErrorBlock
}
type AlterUserScramCredentialsResult struct {
	User string

	ErrorCode    KError
	ErrorMessage *string
}
type ApiVersionsRequest struct {
	
	Version int16
	
	ClientSoftwareName string
	
	ClientSoftwareVersion string
}

ApiVersionsResponseKey contains the APIs supported by the broker.

AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages to the correct broker for the provided topic-partition, refreshing metadata as appropriate, and parses responses for errors. You must read from the Errors() channel or the producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid leaks and message lost: it will not be garbage-collected automatically when it passes out of scope and buffered messages may not be flushed.

This example shows how to use the producer with separate goroutines reading from the Successes and Errors channels. Note that in order for the Successes channel to be populated, you have to set config.Producer.Return.Successes to true.

config := NewTestConfig()
config.Producer.Return.Successes = true
producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
if err != nil {
	panic(err)
}

// Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

var (
	wg                                  sync.WaitGroup
	enqueued, successes, producerErrors int
)

wg.Add(1)
go func() {
	defer wg.Done()
	for range producer.Successes() {
		successes++
	}
}()

wg.Add(1)
go func() {
	defer wg.Done()
	for err := range producer.Errors() {
		log.Println(err)
		producerErrors++
	}
}()

ProducerLoop:
for {
	message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
	select {
	case producer.Input() <- message:
		enqueued++

	case <-signals:
		producer.AsyncClose() // Trigger a shutdown of the producer.
		break ProducerLoop
	}
}

wg.Wait()

log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors)

This example shows how to use the producer while simultaneously reading the Errors channel to know about any failures.

producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
	panic(err)
}

defer func() {
	if err := producer.Close(); err != nil {
		log.Fatalln(err)
	}
}()

// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

var enqueued, producerErrors int
ProducerLoop:
for {
	select {
	case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
		enqueued++
	case err := <-producer.Errors():
		log.Println("Failed to produce message", err)
		producerErrors++
	case <-signals:
		break ProducerLoop
	}
}

log.Printf("Enqueued: %d; errors: %d\n", enqueued, producerErrors)

NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.

NewAsyncProducerFromClient creates a new Producer using the given client. It is still necessary to call Close() on the underlying client when shutting down this producer.

BalanceStrategy is used to balance topics and partitions across members of a consumer group

NewBalanceStrategyRange returns a range balance strategy, which is the default and assigns partitions as ranges to consumer group members. This follows the same logic as https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html

Example with two topics T1 and T2 with six partitions each (0..5) and two members (M1, M2):

M1: {T1: [0, 1, 2], T2: [0, 1, 2]}
M2: {T1: [3, 4, 5], T2: [3, 4, 5]}

NewBalanceStrategyRoundRobin returns a round-robin balance strategy, which assigns partitions to members in alternating order. For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2): M0: [t0p0, t0p2, t1p1] M1: [t0p1, t1p0, t1p2]

NewBalanceStrategySticky returns a sticky balance strategy, which assigns partitions to members with an attempt to preserve earlier assignments while maintain a balanced partition distribution. Example with topic T with six partitions (0..5) and two members (M1, M2):

M1: {T: [0, 2, 4]}
M2: {T: [1, 3, 5]}

On reassignment with an additional consumer, you might get an assignment plan like:

M1: {T: [0, 2]}
M2: {T: [1, 3]}
M3: {T: [4, 5]}

BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt. It contains an allocation of topic/partitions by memberID in the form of a `memberID -> topic -> partitions` map.

Add assigns a topic with a number partitions to a member.

Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.

broker := NewBroker("localhost:9092")
err := broker.Open(nil)
if err != nil {
	panic(err)
}

request := MetadataRequest{Topics: []string{"myTopic"}}
response, err := broker.GetMetadata(&request)
if err != nil {
	_ = broker.Close()
	panic(err)
}

fmt.Println("There are", len(response.Topics), "topics active in the cluster.")

if err = broker.Close(); err != nil {
	panic(err)
}

NewBroker creates and returns a Broker targeting the given host:port address. This does not attempt to actually connect, you have to call Open() for that.

AddOffsetsToTxn sends a request to add offsets to txn and returns a response or error

AddPartitionsToTxn send a request to add partition to txn and returns a response or error

Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.

AlterClientQuotas sends a request to alter the broker's quotas

AlterConfigs sends a request to alter config and return a response or error

AlterPartitionReassignments sends a alter partition reassignments request and returns alter partition reassignments response

ApiVersions return api version response or error

AsyncProduce sends a produce request and eventually call the provided callback with a produce response or an error.

Waiting for the response is generally not blocking on the contrary to using Produce. If the maximum number of in flight request configured is reached then the request will be blocked till a previous response is received.

When configured with RequiredAcks == NoResponse, the callback will not be invoked. If an error is returned because the request could not be sent then the callback will not be invoked either.

Make sure not to Close the broker in the callback as it will lead to a deadlock.

Close closes the broker resources

CommitOffset return an Offset commit response or error

Connected returns true if the broker is connected and false otherwise. If the broker is not connected but it had tried to connect, the error from that connection attempt is also returned.

CreateAcls sends a create acl request and returns a response or error

CreatePartitions sends a create partition request and returns create partitions response or error

CreateTopics send a create topic request and returns create topic response

DeleteAcls sends a delete acl request and returns a response or error

DeleteGroups sends a request to delete groups and returns a response or error

DeleteOffsets sends a request to delete group offsets and returns a response or error

DeleteRecords send a request to delete records and return delete record response or error

DeleteTopics sends a delete topic request and returns delete topic response

DescribeAcls sends a describe acl request and returns a response or error

DescribeClientQuotas sends a request to get the broker's quotas

DescribeConfigs sends a request to describe config and returns a response or error

DescribeGroups return describe group response or error

DescribeLogDirs sends a request to get the broker's log dir paths and sizes

DescribeUserScramCredentials sends a request to get SCRAM users

ElectLeaders sends aa elect leaders request and returns list partitions elect result

EndTxn sends a request to end txn and returns a response or error

Fetch returns a FetchResponse or error

FetchOffset returns an offset fetch response or error

FindCoordinator sends a find coordinate request and returns a response or error

GetAvailableOffsets return an offset response or error

GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error

GetMetadata send a metadata request and returns a metadata response or error

Heartbeat returns a heartbeat response or error

ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.

IncrementalAlterConfigs sends a request to incremental alter config and return a response or error

InitProducerID sends an init producer request and returns a response or error

JoinGroup returns a join group response or error

LeaveGroup return a leave group response or error

ListGroups return a list group response or error

ListPartitionReassignments sends a list partition reassignments request and returns list partition reassignments response

Open tries to connect to the Broker if it is not already connected or connecting, but does not block waiting for the connection to complete. This means that any subsequent operations on the broker will block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call, follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of NewConfig() is used.

Produce returns a produce response or error

Rack returns the broker's rack as retrieved from Kafka's metadata or the empty string if it is not known. The returned value corresponds to the broker's broker.rack configuration setting. Requires protocol version to be at least v0.10.0.0.

SyncGroup returns a sync group response or error

TLSConnectionState returns the client's TLS connection state. The second return value is false if this is not a tls connection or the connection has not yet been established.

TxnOffsetCommit sends a request to commit transaction offsets and returns a response or error

ByteEncoder implements the Encoder interface for Go byte slices so that they can be used as the Key or Value in a ProducerMessage.

type Client interface {
	
	
	Config() *Config

	
	
	
	Controller() (*Broker, error)

	
	
	RefreshController() (*Broker, error)

	
	Brokers() []*Broker

	
	Broker(brokerID int32) (*Broker, error)

	
	Topics() ([]string, error)

	
	Partitions(topic string) ([]int32, error)

	
	
	
	WritablePartitions(topic string) ([]int32, error)

	
	
	Leader(topic string, partitionID int32) (*Broker, error)

	
	
	LeaderAndEpoch(topic string, partitionID int32) (*Broker, int32, error)

	
	Replicas(topic string, partitionID int32) ([]int32, error)

	
	
	
	InSyncReplicas(topic string, partitionID int32) ([]int32, error)

	
	
	OfflineReplicas(topic string, partitionID int32) ([]int32, error)

	
	
	
	RefreshBrokers(addrs []string) error

	
	
	
	RefreshMetadata(topics ...string) error

	
	
	
	
	GetOffset(topic string, partitionID int32, time int64) (int64, error)

	
	
	
	
	Coordinator(consumerGroup string) (*Broker, error)

	
	
	RefreshCoordinator(consumerGroup string) error

	
	
	
	
	TransactionCoordinator(transactionID string) (*Broker, error)

	
	
	RefreshTransactionCoordinator(transactionID string) error

	
	InitProducerID() (*InitProducerIDResponse, error)

	
	LeastLoadedBroker() *Broker

	
	PartitionNotReadable(topic string, partition int32) bool

	
	
	
	
	Close() error

	
	Closed() bool
}

Client is a generic Kafka client. It manages connections to one or more Kafka brokers. You MUST call Close() on a client to avoid leaks, it will not be garbage-collected automatically when it passes out of scope. It is safe to share a client amongst many users, however Kafka will process requests from a single client strictly in serial, so it is generally more efficient to use the default one client per producer/consumer.

NewClient creates a new Client. It connects to one of the given broker addresses and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot be retrieved from any of the given broker addresses, the client is not created.

type ClusterAdmin interface {
	
	
	
	
	CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error

	
	ListTopics() (map[string]TopicDetail, error)

	
	DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)

	
	
	
	
	
	
	DeleteTopic(topic string) error

	
	
	
	
	
	
	CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error

	
	
	AlterPartitionReassignments(topic string, assignment [][]int32) error

	
	
	ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)

	
	
	DeleteRecords(topic string, partitionOffsets map[int32]int64) error

	
	
	
	
	
	
	
	DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)

	
	
	
	
	
	AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error

	
	
	
	
	IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error

	
	
	
	
	
	CreateACL(resource Resource, acl Acl) error

	
	
	
	
	CreateACLs([]*ResourceAcls) error

	
	
	
	ListAcls(filter AclFilter) ([]ResourceAcls, error)

	
	
	
	DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)

	
	ElectLeaders(ElectionType, map[string][]int32) (map[string]map[int32]*PartitionResult, error)

	
	ListConsumerGroups() (map[string]string, error)

	
	DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)

	
	ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)

	
	DeleteConsumerGroupOffset(group string, topic string, partition int32) error

	
	DeleteConsumerGroup(group string) error

	
	DescribeCluster() (brokers []*Broker, controllerID int32, err error)

	
	DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)

	
	DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error)

	
	DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error)

	
	UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)

	
	
	DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error)

	
	
	AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error

	
	
	Controller() (*Broker, error)

	
	
	Coordinator(group string) (*Broker, error)

	
	
	
	RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error)

	
	Close() error
}

ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0. Methods with stricter requirements will specify the minimum broker version required. You MUST call Close() on a client to avoid leaks

NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.

NewClusterAdminFromClient creates a new ClusterAdmin using the given client. Note that underlying client will also be closed on admin's Close() call.

type CompressionCodec int8

CompressionCodec represents the various compression codecs recognized by Kafka in messages.

const (
	
	CompressionNone CompressionCodec = iota
	
	CompressionGZIP
	
	CompressionSnappy
	
	CompressionLZ4
	
	CompressionZSTD

	
	
	
	CompressionLevelDefault = -1000
)

MarshalText transforms a CompressionCodec into its string representation.

UnmarshalText returns a CompressionCodec from its string representation.

Config is used to pass multiple configuration options to Sarama's constructors.

This example shows how to integrate with an existing registry as well as publishing metrics on the standard output

// Our application registry
appMetricRegistry := metrics.NewRegistry()
appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
appGauge.Update(1)

config := NewTestConfig()
// Use a prefix registry instead of the default local one
config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")

// Simulate a metric created by sarama without starting a broker
saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
saramaGauge.Update(2)

metrics.WriteOnce(appMetricRegistry, os.Stdout)
Output:

gauge m1
  value:               1
gauge sarama.m2
  value:               2

NewConfig returns a new configuration instance with sane defaults.

Validate checks a Config instance. It will return a ConfigurationError if the specified values don't make sense.

type ConfigResourceType int8

ConfigResourceType is a type for resources that have configs.

const (
	SourceUnknown ConfigSource = iota
	SourceTopic
	SourceDynamicBroker
	SourceDynamicDefaultBroker
	SourceStaticBroker
	SourceDefault
)
type ConfigurationError string

ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer) when the specified configuration is invalid.

Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.

This example shows how to use the consumer to read messages from a single partition.

consumer, err := NewConsumer([]string{"localhost:9092"}, NewTestConfig())
if err != nil {
	panic(err)
}

defer func() {
	if err := consumer.Close(); err != nil {
		log.Fatalln(err)
	}
}()

partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
if err != nil {
	panic(err)
}

defer func() {
	if err := partitionConsumer.Close(); err != nil {
		log.Fatalln(err)
	}
}()

// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

consumed := 0
ConsumerLoop:
for {
	select {
	case msg := <-partitionConsumer.Messages():
		log.Printf("Consumed message offset %d\n", msg.Offset)
		consumed++
	case <-signals:
		break ConsumerLoop
	}
}

log.Printf("Consumed: %d\n", consumed)

NewConsumer creates a new consumer using the given broker addresses and configuration.

NewConsumerFromClient creates a new consumer using the given client. It is still necessary to call Close() on the underlying client when shutting down this consumer.

ConsumerError is what is provided to the user when an error occurs. It wraps an error and includes the topic and partition.

ConsumerErrors is a type that wraps a batch of errors and implements the Error interface. It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors when stopping.

ConsumerGroup is responsible for dividing up processing of topics and partitions over a collection of processes (the members of the consumer group).

//go:build !functional

package main

import (
	"context"
	"fmt"
)

type exampleConsumerGroupHandler struct{}

func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error   { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
		sess.MarkMessage(msg, "")
	}
	return nil
}

func main() {
	config := NewTestConfig()
	config.Version = V2_0_0_0 // specify appropriate version
	config.Consumer.Return.Errors = true

	group, err := NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
	if err != nil {
		panic(err)
	}
	defer func() { _ = group.Close() }()

	// Track errors
	go func() {
		for err := range group.Errors() {
			fmt.Println("ERROR", err)
		}
	}()

	// Iterate over consumer sessions.
	ctx := context.Background()
	for {
		topics := []string{"my-topic"}
		handler := exampleConsumerGroupHandler{}

		// `Consume` should be called inside an infinite loop, when a
		// server-side rebalance happens, the consumer session will need to be
		// recreated to get the new claims
		err := group.Consume(ctx, topics, handler)
		if err != nil {
			panic(err)
		}
	}
}

NewConsumerGroup creates a new consumer group the given broker addresses and configuration.

NewConsumerGroupFromClient creates a new consumer group using the given client. It is still necessary to call Close() on the underlying client when shutting down this consumer. PLEASE NOTE: consumer groups can only re-use but not share clients.

ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.

type ConsumerGroupHandler added in v1.19.0

ConsumerGroupHandler instances are used to handle individual topic/partition claims. It also provides hooks for your consumer group session life-cycle and allow you to trigger logic before or after the consume loop(s).

PLEASE NOTE that handlers are likely be called from several goroutines concurrently, ensure that all state is safely protected against race conditions.

ConsumerGroupSession represents a consumer group member session.

ConsumerMessage encapsulates a Kafka message returned by the consumer.

type ConsumerMetadataRequest struct {
	Version       int16
	ConsumerGroup string
}

ConsumerMetadataRequest is used for metadata requests

ConsumerMetadataResponse holds the response for a consumer group meta data requests

Control records are returned as a record by fetchRequest However unlike "normal" records, they mean nothing application wise. They only serve internal logic for supporting transactions.

type ControlRecordType int

ControlRecordType ...

type CoordinatorType int8

CreateAclsRequest is an acl creation request

CreateAclsResponse is a an acl response creation type

type DeleteAclsRequest struct {
	Version int
	Filters []*AclFilter
}

DeleteAclsRequest is a delete acl request

DeleteAclsResponse is a delete acl response

type DeleteGroupsRequest struct {
	Version int16
	Groups  []string
}
type DeleteOffsetsRequest struct {
	Version int16
	Group   string
	
}
type DeleteRecordsRequestTopic struct {
	PartitionOffsets map[int32]int64 
}
type DeleteRecordsResponsePartition struct {
	LowWatermark int64
	Err          KError
}

DescribeAclsRequest is a describe acl request type

DescribeAclsResponse is a describe acl response type

A filter to be applied to matching client quotas. Components: the components to filter on Strict: whether the filter only includes specified components

type DescribeConfigError struct {
	Err    KError
	ErrMsg string
}
type DescribeGroupsRequest struct {
	Version                     int16
	Groups                      []string
}

DescribeLogDirsRequest is a describe request to get partitions' log size

type DescribeLogDirsRequestTopic struct {
	Topic        string
	PartitionIDs []int32
}

DescribeLogDirsRequestTopic is a describe request about the log dir of one or more partitions within a Topic

type DescribeLogDirsResponsePartition struct {
	PartitionID int32

	
	Size int64

	
	
	OffsetLag int64

	
	
	IsTemporary bool
}

DescribeLogDirsResponsePartition describes a partition's log directory

DescribeLogDirsResponseTopic contains a topic's partitions descriptions

DescribeUserScramCredentialsRequest is a request to get list of SCRAM user names

type DescribeUserScramCredentialsRequestUser struct {
	Name string
}

DescribeUserScramCredentialsRequestUser is a describe request about specific user name

DynamicConsistencyPartitioner can optionally be implemented by Partitioners in order to allow more flexibility than is originally allowed by the RequiresConsistency method in the Partitioner interface. This allows partitioners to require consistency sometimes, but not all times. It's useful for, e.g., the HashPartitioner, which does not require consistency if the message key is nil.

type Encoder interface {
	Encode() ([]byte, error)
	Length() int
}

Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().

type EndTxnRequest struct {
	Version           int16
	TransactionalID   string
	ProducerID        int64
	ProducerEpoch     int16
	TransactionResult bool
}

AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions

FilterResponse is a filter response type

This does the handshake for authorization

type GroupData struct {
	GroupState string 
}

GroupDescription contains each described group.

type GroupMember struct {
	
	MemberId string
	
	
	GroupInstanceId *string
	
	Metadata []byte
}

GroupMemberDescription contains the group members.

type GroupProtocol struct {
	
	Name string
	
	Metadata []byte
}
type HashPartitionerOption func(*hashPartitioner)

HashPartitionerOption lets you modify default values of the partitioner

WithAbsFirst means that the partitioner handles absolute values in the same way as the reference Java implementation

WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty

WithCustomHashFunction lets you specify what hash function to use for the partitioning

WithHashUnsigned means the partitioner treats the hashed value as unsigned when partitioning. This is intended to be combined with the crc32 hash algorithm to be compatible with librdkafka's implementation

type IncrementalAlterConfigsOperation int8
const (
	IncrementalAlterConfigsOperationSet IncrementalAlterConfigsOperation = iota
	IncrementalAlterConfigsOperationDelete
	IncrementalAlterConfigsOperationAppend
	IncrementalAlterConfigsOperationSubtract
)

IncrementalAlterConfigsRequest is an incremental alter config request type

IncrementalAlterConfigsResponse is a response type for incremental alter config

KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes

const (
	ErrUnknown                            KError = -1 
	ErrNoError                            KError = 0  
	ErrOffsetOutOfRange                   KError = 1  
	ErrInvalidMessage                     KError = 2  
	ErrUnknownTopicOrPartition            KError = 3  
	ErrInvalidMessageSize                 KError = 4  
	ErrLeaderNotAvailable                 KError = 5  
	ErrNotLeaderForPartition              KError = 6  
	ErrRequestTimedOut                    KError = 7  
	ErrBrokerNotAvailable                 KError = 8  
	ErrReplicaNotAvailable                KError = 9  
	ErrMessageSizeTooLarge                KError = 10 
	ErrStaleControllerEpochCode           KError = 11 
	ErrOffsetMetadataTooLarge             KError = 12 
	ErrNetworkException                   KError = 13 
	ErrOffsetsLoadInProgress              KError = 14 
	ErrConsumerCoordinatorNotAvailable    KError = 15 
	ErrNotCoordinatorForConsumer          KError = 16 
	ErrInvalidTopic                       KError = 17 
	ErrMessageSetSizeTooLarge             KError = 18 
	ErrNotEnoughReplicas                  KError = 19 
	ErrNotEnoughReplicasAfterAppend       KError = 20 
	ErrInvalidRequiredAcks                KError = 21 
	ErrIllegalGeneration                  KError = 22 
	ErrInconsistentGroupProtocol          KError = 23 
	ErrInvalidGroupId                     KError = 24 
	ErrUnknownMemberId                    KError = 25 
	ErrInvalidSessionTimeout              KError = 26 
	ErrRebalanceInProgress                KError = 27 
	ErrInvalidCommitOffsetSize            KError = 28 
	ErrTopicAuthorizationFailed           KError = 29 
	ErrGroupAuthorizationFailed           KError = 30 
	ErrClusterAuthorizationFailed         KError = 31 
	ErrInvalidTimestamp                   KError = 32 
	ErrUnsupportedSASLMechanism           KError = 33 
	ErrIllegalSASLState                   KError = 34 
	ErrUnsupportedVersion                 KError = 35 
	ErrTopicAlreadyExists                 KError = 36 
	ErrInvalidPartitions                  KError = 37 
	ErrInvalidReplicationFactor           KError = 38 
	ErrInvalidReplicaAssignment           KError = 39 
	ErrInvalidConfig                      KError = 40 
	ErrNotController                      KError = 41 
	ErrInvalidRequest                     KError = 42 
	ErrUnsupportedForMessageFormat        KError = 43 
	ErrPolicyViolation                    KError = 44 
	ErrOutOfOrderSequenceNumber           KError = 45 
	ErrDuplicateSequenceNumber            KError = 46 
	ErrInvalidProducerEpoch               KError = 47 
	ErrInvalidTxnState                    KError = 48 
	ErrInvalidProducerIDMapping           KError = 49 
	ErrInvalidTransactionTimeout          KError = 50 
	ErrConcurrentTransactions             KError = 51 
	ErrTransactionCoordinatorFenced       KError = 52 
	ErrTransactionalIDAuthorizationFailed KError = 53 
	ErrSecurityDisabled                   KError = 54 
	ErrOperationNotAttempted              KError = 55 
	ErrKafkaStorageError                  KError = 56 
	ErrLogDirNotFound                     KError = 57 
	ErrSASLAuthenticationFailed           KError = 58 
	ErrUnknownProducerID                  KError = 59 
	ErrReassignmentInProgress             KError = 60 
	ErrDelegationTokenAuthDisabled        KError = 61 
	ErrDelegationTokenNotFound            KError = 62 
	ErrDelegationTokenOwnerMismatch       KError = 63 
	ErrDelegationTokenRequestNotAllowed   KError = 64 
	ErrDelegationTokenAuthorizationFailed KError = 65 
	ErrDelegationTokenExpired             KError = 66 
	ErrInvalidPrincipalType               KError = 67 
	ErrNonEmptyGroup                      KError = 68 
	ErrGroupIDNotFound                    KError = 69 
	ErrFetchSessionIDNotFound             KError = 70 
	ErrInvalidFetchSessionEpoch           KError = 71 
	ErrListenerNotFound                   KError = 72 
	ErrTopicDeletionDisabled              KError = 73 
	ErrFencedLeaderEpoch                  KError = 74 
	ErrUnknownLeaderEpoch                 KError = 75 
	ErrUnsupportedCompressionType         KError = 76 
	ErrStaleBrokerEpoch                   KError = 77 
	ErrOffsetNotAvailable                 KError = 78 
	ErrMemberIdRequired                   KError = 79 
	ErrPreferredLeaderNotAvailable        KError = 80 
	ErrGroupMaxSizeReached                KError = 81 
	ErrFencedInstancedId                  KError = 82 
	ErrEligibleLeadersNotAvailable        KError = 83 
	ErrElectionNotNeeded                  KError = 84 
	ErrNoReassignmentInProgress           KError = 85 
	ErrGroupSubscribedToTopic             KError = 86 
	ErrInvalidRecord                      KError = 87 
	ErrUnstableOffsetCommit               KError = 88 
	ErrThrottlingQuotaExceeded            KError = 89 
	ErrProducerFenced                     KError = 90 
)

Numeric error codes returned by the Kafka server.

type KafkaVersion struct {
	
}

KafkaVersion instances represent versions of the upstream Kafka broker.

ParseKafkaVersion parses and returns kafka version or error from a string

IsAtLeast return true if and only if the version it is called on is greater than or equal to the version passed in:

V1.IsAtLeast(V2) // false
V2.IsAtLeast(V1) // true

NewKerberosClient creates kerberos client used to obtain TGT and TGS tokens. It uses pure go Kerberos 5 solution (RFC-4121 and RFC-4120). uses gokrb5 library underlying which is a pure go kerberos client with some GSS-API capabilities.

func (*KerberosGoKrb5Client) Domain added in v1.40.0
type ListGroupsRequest struct {
	Version      int16
	StatesFilter []string 
}
type ListPartitionReassignmentsRequest struct {
	TimeoutMs int32

	Version int16
	
}

MatchingAcl is a matching acl type

type MemberIdentity struct {
	MemberId        string
	GroupInstanceId *string
}

Message is a kafka message type

Messages convenience helper which returns either all the messages that are wrapped in this block

type MessageSet struct {
	PartialTrailingMessage bool 
	OverflowMessage        bool 
	Messages               []*MessageBlock
}
type MetadataRequest struct {
	
	Version int16
	
	Topics []string
	
	AllowAutoTopicCreation             bool
	IncludeClusterAuthorizedOperations bool 
	IncludeTopicAuthorizedOperations   bool 
}
type MockAlterConfigsResponse struct {
	
}
type MockAlterConfigsResponseWithErrorCode struct {
	
}
type MockAlterPartitionReassignmentsResponse struct {
	
}
type MockApiVersionsResponse struct {
	
}
type MockBroker struct {
	
}

MockBroker is a mock Kafka broker that is used in unit tests. It is exposed to facilitate testing of higher level or specialized consumers and producers built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol, but rather provides a facility to do that. It takes care of the TCP transport, request unmarshalling, response marshaling, and makes it the test writer responsibility to program correct according to the Kafka API protocol MockBroker behavior.

MockBroker is implemented as a TCP server listening on a kernel-selected localhost port that can accept many connections. It reads Kafka requests from that connection and returns responses programmed by the SetHandlerByMap function. If a MockBroker receives a request that it has no programmed response for, then it returns nothing and the request times out.

A set of MockRequest builders to define mappings used by MockBroker is provided by Sarama. But users can develop MockRequests of their own and use them along with or instead of the standard ones.

When running tests with MockBroker it is strongly recommended to specify a timeout to `go test` so that if the broker hangs waiting for a response, the test panics.

It is not necessary to prefix message length or correlation ID to your response bytes, the server does that automatically as a convenience.

NewMockBroker launches a fake Kafka broker. It takes a TestReporter as provided by the test framework and a channel of responses to use. If an error occurs it is simply logged to the TestReporter and the broker exits.

NewMockBrokerAddr behaves like newMockBroker but listens on the address you give it rather than just some ephemeral port.

NewMockBrokerListener behaves like newMockBrokerAddr but accepts connections on the listener specified.

Addr returns the broker connection string in the form "<address>:<port>".

BrokerID returns broker ID assigned to the broker.

Close terminates the broker blocking until it stops internal goroutines and releases all resources.

History returns a slice of RequestResponse pairs in the order they were processed by the broker. Note that in case of multiple connections to the broker the order expected by a test can be different from the order recorded in the history, unless some synchronization is implemented in the test.

Port returns the TCP port number the broker is listening for requests on.

func (*MockBroker) SetHandlerByMap added in v1.9.0

SetHandlerByMap defines mapping of Request types to MockResponses. When a request is received by the broker, it looks up the request type in the map and uses the found MockResponse instance to generate an appropriate reply. If the request type is not found in the map then nothing is sent.

func (*MockBroker) SetHandlerFuncByMap added in v1.41.2
func (b *MockBroker) SetHandlerFuncByMap(handlerMap map[string]requestHandlerFunc)

SetHandlerFuncByMap defines mapping of Request types to RequestHandlerFunc. When a request is received by the broker, it looks up the request type in the map and invoke the found RequestHandlerFunc instance to generate an appropriate reply.

SetLatency makes broker pause for the specified period every time before replying.

SetNotifier set a function that will get invoked whenever a request has been processed successfully and will provide the number of bytes read and written

type MockConsumerMetadataResponse struct {
	
}

MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.

type MockCreateAclsResponse struct {
	
}
type MockCreateAclsResponseError struct {
	
}
type MockCreatePartitionsResponse struct {
	
}
type MockCreateTopicsResponse struct {
	
}
type MockDeleteAclsResponse struct {
	
}
type MockDeleteGroupsResponse struct {
	
}
type MockDeleteOffsetResponse struct {
	
}
type MockDeleteRecordsResponse struct {
	
}
type MockDeleteTopicsResponse struct {
	
}
type MockDescribeConfigsResponse struct {
	
}
type MockDescribeConfigsResponseWithErrorCode struct {
	
}
type MockDescribeGroupsResponse struct {
	
}
type MockDescribeLogDirsResponse struct {
	
}
type MockElectLeadersResponse struct {
	
}
type MockFetchResponse struct {
	
}

MockFetchResponse is a `FetchResponse` builder.

type MockFindCoordinatorResponse struct {
	
}

MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.

type MockHeartbeatResponse struct {
	Err KError
	
}
type MockIncrementalAlterConfigsResponse struct {
	
}
type MockIncrementalAlterConfigsResponseWithErrorCode struct {
	
}
type MockInitProducerIDResponse struct {
	
}

MockInitProducerIDResponse is an `InitPorducerIDResponse` builder.

func (*MockKerberosClient) Domain added in v1.40.0
type MockLeaveGroupResponse struct {
	Err KError
	
}
type MockListAclsResponse struct {
	
}
type MockListGroupsResponse struct {
	
}
type MockListPartitionReassignmentsResponse struct {
	
}
type MockMetadataResponse struct {
	
}

MockMetadataResponse is a `MetadataResponse` builder.

type MockOffsetCommitResponse struct {
	
}

MockOffsetCommitResponse is a `OffsetCommitResponse` builder.

type MockOffsetFetchResponse struct {
	
}

MockOffsetFetchResponse is a `OffsetFetchResponse` builder.

type MockOffsetResponse struct {
	
}

MockOffsetResponse is an `OffsetResponse` builder.

type MockProduceResponse struct {
	
}

MockProduceResponse is a `ProduceResponse` builder.

type MockResponse interface {
	For(reqBody versionedDecoder) (res encoderWithHeader)
}

MockResponse is a response builder interface it defines one method that allows generating a response based on a request body. MockResponses are used to program behavior of MockBroker in tests.

type MockSaslAuthenticateResponse struct {
	
}
type MockSaslHandshakeResponse added in v1.40.0
type MockSaslHandshakeResponse struct {
	
}
func (*MockSaslHandshakeResponse) For added in v1.40.0 func (*MockSaslHandshakeResponse) SetError added in v1.40.0
type MockSequence struct {
	
}

MockSequence is a mock response builder that is created from a sequence of concrete responses. Every time when a `MockBroker` calls its `For` method the next response from the sequence is returned. When the end of the sequence is reached the last element from the sequence is returned.

func NewMockSequence(responses ...interface{}) *MockSequence
func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader)
type MockSyncGroupResponse struct {
	Err              KError
	MemberAssignment []byte
	
}
type MockWrapper struct {
	
}

MockWrapper is a mock response builder that returns a particular concrete response regardless of the actual request passed to the `For` method.

func NewMockWrapper(res encoderWithHeader) *MockWrapper
func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader)
type OffsetFetchRequest struct {
	Version       int16
	ConsumerGroup string
	RequireStable bool 
	
}

OffsetManager uses Kafka to store and fetch consumed partition offsets.

NewOffsetManagerFromClient creates a new OffsetManager from the given client. It is still necessary to call Close() on the underlying client when finished with the partition manager.

type OwnedPartition struct {
	Topic      string
	Partitions []int32
}
type PacketDecodingError struct {
	Info string
}

PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.

type PacketEncodingError struct {
	Info string
}

PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.

PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out of scope.

The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported as out of range by the brokers. In this case you should decide what you want to do (try a different offset, notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying. By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.

To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will also drain the Messages channel, harvest all errors & return them once cleanup has completed.

type PartitionError struct {
	Partition int32
	Err       KError
}

PartitionError is a partition error type

PartitionMetadata contains each partition in the topic.

PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close() on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.

type PartitionReplicaReassignmentsStatus struct {
	Replicas         []int32
	AddingReplicas   []int32
	RemovingReplicas []int32
}
type PartitionResult struct {
	ErrorCode    KError
	ErrorMessage *string
}

Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1], decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided as simple default implementations.

This example shows how to assign partitions to your messages manually.

config := NewTestConfig()

// First, we tell the producer that we are going to partition ourselves.
config.Producer.Partitioner = NewManualPartitioner

producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
	log.Println(err)
	return
}
defer func() {
	if err := producer.Close(); err != nil {
		log.Println("Failed to close producer:", err)
	}
}()

// Now, we set the Partition field of the ProducerMessage struct.
msg := &ProducerMessage{Topic: "test", Partition: 6, Value: StringEncoder("test")}

partition, offset, err := producer.SendMessage(msg)
if err != nil {
	log.Println("Failed to produce message to kafka cluster.")
	return
}

if partition != 6 {
	log.Println("Message should have been produced to partition 6!")
	return
}

log.Printf("Produced message to partition %d with offset %d", partition, offset)

This example shows how to set a different partitioner depending on the topic.

config := NewTestConfig()
config.Producer.Partitioner = func(topic string) Partitioner {
	switch topic {
	case "access_log", "error_log":
		return NewRandomPartitioner(topic)

	default:
		return NewHashPartitioner(topic)
	}
}

// ...

By default, Sarama uses the message's key to consistently assign a partition to a message using hashing. If no key is set, a random partition will be chosen. This example shows how you can partition messages randomly, even when a key is set, by overriding Config.Producer.Partitioner.

config := NewTestConfig()
config.Producer.Partitioner = NewRandomPartitioner

producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
	log.Println(err)
	return
}
defer func() {
	if err := producer.Close(); err != nil {
		log.Println("Failed to close producer:", err)
	}
}()

msg := &ProducerMessage{Topic: "test", Key: StringEncoder("key is set"), Value: StringEncoder("test")}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
	log.Println("Failed to produce message to kafka cluster.")
	return
}

log.Printf("Produced message to partition %d with offset %d", partition, offset)

NewConsistentCRCHashPartitioner is like NewHashPartitioner execpt that it uses the *unsigned* crc32 hash of the encoded bytes of the message key modulus the number of partitions. This is compatible with librdkafka's `consistent_random` partitioner

NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used, modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.

NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided ProducerMessage's Partition field as the partition to produce to.

func NewRandomPartitioner

NewRandomPartitioner returns a Partitioner which chooses a random partition each time.

NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values in the same way as the reference Java implementation. NewHashPartitioner was supposed to do that but it had a mistake and now there are people depending on both behaviors. This will all go away on the next major version bump.

NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.

PartitionerConstructor is the type for a function capable of constructing new Partitioners.

NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher. The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.

NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options

ProduceCallback function is called once the produce response has been parsed or could not be read.

partition_responses in protocol

ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.

ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface. It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel when closing a producer.

ProducerMessage is the collection of elements passed to the Producer in order to send a message.

type ProducerTxnStatusFlag int16

ProducerTxnStatusFlag mark current transaction status.

const (
	
	ProducerTxnFlagUninitialized ProducerTxnStatusFlag = 1 << iota
	
	ProducerTxnFlagInitializing
	
	ProducerTxnFlagReady
	
	ProducerTxnFlagInTransaction
	
	ProducerTxnFlagEndTransaction
	
	ProducerTxnFlagInError
	
	ProducerTxnFlagCommittingTransaction
	
	ProducerTxnFlagAbortingTransaction
	
	
	ProducerTxnFlagAbortableError
	
	
	ProducerTxnFlagFatalError
)

Describe a component for applying a client quota filter. EntityType: the entity type the filter component applies to ("user", "client-id", "ip") MatchType: the match type of the filter component (any, exact, default) Match: the name that's matched exactly (used when MatchType is QuotaMatchExact)

Record is kafka record type

type RecordHeader struct {
}

RecordHeader stores key and value for a record header

Records implements a union type containing either a RecordBatch or a legacy MessageSet.

type RequestNotifierFunc func(bytesRead, bytesWritten int)

RequestNotifierFunc is invoked when a mock broker processes a request successfully and will provides the number of bytes read and written.

type RequestResponse struct {
	Request  protocolBody
	Response encoder
}

RequestResponse represents a Request/Response pair processed by MockBroker.

RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).

Resource holds information about acl resource type

ResourceAcls is an acl resource type

SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker

SCRAMClient is a an interface to a SCRAM client implementation.

type SaslAuthenticateRequest struct {
	
	Version       int16
	SaslAuthBytes []byte
}
type SaslAuthenticateResponse struct {
	
	Version           int16
	Err               KError
	ErrorMessage      *string
	SaslAuthBytes     []byte
	SessionLifetimeMs int64
}
type ScramMechanismType int8
type StdLogger interface {
	Print(v ...interface{})
	Printf(format string, v ...interface{})
	Println(v ...interface{})
}

StdLogger is used to log error messages.

DebugLogger is the instance of a StdLogger that Sarama writes more verbose debug information to. By default it is set to redirect all debug to the default Logger above, but you can optionally set it to another StdLogger instance to (e.g.,) discard debug information

type StickyAssignorUserData interface {
	
}
type StickyAssignorUserDataV0 struct {
	Topics map[string][]int32
	
}

StickyAssignorUserDataV0 holds topic partition information for an assignment

type StickyAssignorUserDataV1 struct {
	Topics     map[string][]int32
	Generation int32
	
}

StickyAssignorUserDataV1 holds topic partition information for an assignment

StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.

type SyncGroupRequestAssignment struct {
	
	MemberId string
	
	Assignment []byte
}
type SyncGroupResponse struct {
	
	Version int16
	
	
	
	ThrottleTime int32
	
	Err KError
	
	MemberAssignment []byte
}

SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.

The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.

For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to be set to true in its configuration.

This example shows the basic usage pattern of the SyncProducer.

producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
	log.Fatalln(err)
}
defer func() {
	if err := producer.Close(); err != nil {
		log.Fatalln(err)
	}
}()

msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
	log.Printf("FAILED to send message: %s\n", err)
} else {
	log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
}

NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.

NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still necessary to call Close() on the underlying client when shutting down this producer.

type TestReporter interface {
	Error(...interface{})
	Errorf(string, ...interface{})
	Fatal(...interface{})
	Fatalf(string, ...interface{})
	Helper()
}

TestReporter has methods matching go's testing.T to avoid importing `testing` in the main part of the library.

TopicMetadata contains each topic in the response.

type TopicPartition struct {
	Count      int32
	Assignment [][]int32
}
type TopicPartitionError struct {
	Err    KError
	ErrMsg *string
}
type ZstdDecoderParams struct {
}
type ZstdEncoderParams struct {
	Level int
}

RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4