A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo below:

kgo package - github.com/twmb/franz-go/pkg/kgo - Go Packages

Package kgo provides a pure Go efficient Kafka client for Kafka 0.8+ with support for transactions, regex topic consuming, the latest partition strategies, and more. This client supports all client related KIPs.

This client aims to be simple to use while still interacting with Kafka in a near ideal way. For more overview of the entire client itself, please see the README on the project's Github page.

View Source
var (

	
	
	ErrRecordTimeout = errors.New("records have timed out before they were able to be produced")

	
	
	ErrRecordRetries = errors.New("record failed after being retried too many times")

	
	
	
	ErrMaxBuffered = errors.New("the maximum amount of records are buffered, cannot buffer more")

	
	
	ErrAborting = errors.New("client is aborting buffered records")

	
	
	
	
	
	
	
	
	
	ErrClientClosed = errors.New("client closed")
)

IsRetryableBrokerErr returns whether the client considers an error from a broker retrayble. This returns true specifically if the client thinks it can retry whatever it was just trying to do with a broker. It returns false in all other cases.

This can used external to the library to help filter errors if use kgo hooks: errors may be sent to hooks before the client retries whatever it was just attempting.

NodeName returns the name of a node, given the kgo internal node ID.

Internally, seed brokers are stored with very negative node IDs, and these node IDs are visible in the BrokerMetadata struct. You can use NodeName to convert the negative node ID into "seed_#". Brokers discovered through metadata responses have standard non-negative numbers and this function just returns the number as a string.

ParseConsumerSyncAssignment returns an assignment as specified a kmsg.ConsumerMemberAssignment, that is, the type encoded in metadata for the consumer protocol.

PreCommitFnContext attaches fn to the context through WithValue. Using the context while committing allows fn to be called just before the commit is issued. This can be used to modify the actual commit, such as by associating metadata with partitions. If fn returns an error, the commit is not attempted.

PreTxnCommitFnContext attaches fn to the context through WithValue. Using the context while committing a transaction allows fn to be called just before the commit is issued. This can be used to modify the actual commit, such as by associating metadata with partitions (for transactions, the default internal metadata is the client's current member ID). If fn returns an error, the commit is not attempted. This context can be used in either GroupTransactSession.End or in Client.EndTransaction.

ValidateOpts returns an error if the options are invalid.

Acks represents the number of acks a broker leader must have before a produce request is considered complete.

This controls the durability of written records and corresponds to "acks" in Kafka's Producer Configuration documentation.

The default is LeaderAck.

AllISRAcks ensures that all in-sync replicas have acknowledged they wrote a record before the leader replies success.

LeaderAck causes Kafka to reply that a record is written after only the leader has written a message. The leader does not wait for in-sync replica replies.

NoAck considers records sent as soon as they are written on the wire. The leader does not reply to records.

type BalancePlan struct {
	
}

BalancePlan is a helper type to build the result of balancing topics and partitions among group members.

AddPartition assigns a partition for the topic to a given member.

AddPartitions assigns many partitions for a topic to a given member.

AdjustCooperative performs the final adjustment to a plan for cooperative balancing.

Over the plan, we remove all partitions that migrated from one member (where it was assigned) to a new member (where it is now planned).

This allows members that had partitions removed to revoke and rejoin, which will then do another rebalance, and in that new rebalance, the planned partitions are now on the free list to be assigned.

AsMemberIDMap returns the plan as a map of member IDs to their topic & partition assignments.

Internally, a BalancePlan is currently represented as this map. Any modification to the map modifies the plan. The internal representation of a plan may change in the future to include more metadata. If this happens, the map returned from this function may not represent all aspects of a plan. The client will attempt to mirror modifications to the map directly back into the underlying plan as best as possible.

IntoSyncAssignment satisfies the IntoSyncAssignment interface.

Broker pairs a broker ID with a client to directly issue requests to a specific broker.

Request issues a request to a broker. If the broker does not exist in the client, this returns an unknown broker error. Requests are not retried.

The passed context can be used to cancel a request and return early. Note that if the request is not canceled before it is written to Kafka, you may just end up canceling and not receiving the response to what Kafka inevitably does.

It is more beneficial to always use RetriableRequest.

RetriableRequest issues a request to a broker the same as Broker, but retries in the face of retryable broker connection errors. This does not retry on response internal errors.

BrokerE2E tracks complete information for a write of a request followed by a read of that requests's response.

Note that if this is for a produce request with no acks, there will be no read wait / time to read.

DurationE2E returns the e2e time from the start of when a request is written to the end of when the response for that request was fully read. If a write or read error occurs, this hook is called with all information possible at the time (e.g., if a write error occurs, all write info is specified).

Kerberos SASL does not cause this hook, since it directly reads from the connection.

Err returns the first of either the write err or the read err. If this return is non-nil, the request/response had an error.

BrokerMetadata is metadata for a broker.

This struct mirrors kmsg.MetadataResponseBroker.

Client issues requests and handles responses to a Kafka cluster.

NewClient returns a new Kafka client with the given options or an error if the options are invalid. Connections to brokers are lazily created only when requests are written to them.

By default, the client uses the latest stable request versions when talking to Kafka. If you use a broker older than 0.10.0, then you need to manually set a MaxVersions option. Otherwise, there is usually no harm in defaulting to the latest API versions, although occasionally Kafka introduces new required parameters that do not have zero value defaults.

NewClient also launches a goroutine which periodically updates the cached topic metadata.

AbortBufferedRecords fails all unflushed records with ErrAborted and waits for there to be no buffered records.

This accepts a context to quit the wait early, but quitting the wait may lead to an invalid state and should only be used if you are quitting your application. This function waits to abort records at safe points: if records are known to not be in flight. This function is safe to call multiple times concurrently, and safe to call concurrent with Flush.

NOTE: This aborting record waits until all inflight requests have known responses. The client must wait to ensure no duplicate sequence number issues. For more details, and for an immediate alternative, check the documentation on UnsafeAbortBufferedRecords.

AddConsumePartitions adds new partitions to be consumed at the given offsets. This function works only for direct, non-regex consumers.

AddConsumeTopics adds new topics to be consumed. This function is a no-op if the client is configured to consume via regex.

Note that if you are directly consuming and specified ConsumePartitions, this function will not add the rest of the partitions for a topic unless the topic has been previously purged. That is, if you directly consumed only one of five partitions originally, this will not add the other four until the entire topic is purged.

func (cl *Client) AllowRebalance()

AllowRebalance allows a consumer group to rebalance if it was blocked by you polling records in tandem with the BlockRebalanceOnPoll option.

You can poll many times before calling this function; this function internally resets the poll count and allows any blocked rebalances to continue. Rebalances take priority: if a rebalance is blocked, and you allow rebalances and then immediately poll, your poll will be blocked until the rebalance completes. Internally, this function simply waits for lost partitions to stop being fetched before allowing you to poll again.

BeginTransaction sets the client to a transactional state, erroring if there is no transactional ID, or if the producer is currently in a fatal (unrecoverable) state, or if the client is already in a transaction.

This must not be called concurrently with other client functions.

Broker returns a handle to a specific broker to directly issue requests to. Note that there is no guarantee that this broker exists; if it does not, requests will fail with an unknown broker error.

BufferedFetchBytes returns the number of bytes currently buffered from fetching within the client. This is the sum of all keys, values, and header keys/values. See the related [BufferedFetchRecords] for more information.

BufferedFetchRecords returns the number of records currently buffered from fetching within the client.

This can be used as a gauge to determine how behind your application is for processing records the client has fetched. Note that it is perfectly normal to see a spike of buffered records, which would correspond to a fetch response being processed just before a call to this function. It is only problematic if for you if this function is consistently returning large values.

BufferedProduceBytes returns the number of bytes currently buffered for producing within the client. This is the sum of all keys, values, and header keys/values. See the related [BufferedProduceRecords] for more information.

BufferedProduceRecords returns the number of records currently buffered for producing within the client.

This can be used as a gauge to determine how far behind the client is for flushing records produced by your client (which can help determine network / cluster health).

Close leaves any group and closes all connections and goroutines. This function waits for the group to be left. If you want to force leave a group immediately and ensure a speedy shutdown you can use LeaveGroupContext first (and then Close will be immediate).

If you are group consuming and have overridden the default OnPartitionsRevoked, you must manually commit offsets before closing the client.

If you are using the BlockRebalanceOnPoll option and have polled, this function does not automatically allow rebalancing. You must AllowRebalance before calling this function. Internally, this function leaves the group, and leaving a group causes a rebalance so that you can get one final notification of revoked partitions. If you want to automatically allow rebalancing, use CloseAllowingRebalance.

If you are using static membership, Close will NOT send a leave group request. See InstanceID for more details.

func (cl *Client) CloseAllowingRebalance()

CloseAllowingRebalance allows rebalances, leaves any group, and closes all connections and goroutines. This function is only useful if you are using the BlockRebalanceOnPoll option. Close itself does not allow rebalances and will hang if you polled, did not allow rebalances, and want to close. Close does not automatically allow rebalances because leaving a group causes a revoke, and the client does not assume that the final revoke is concurrency safe. The CloseAllowingRebalance function exists a shortcut to opt into allowing rebalance while closing.

If you are using static membership, CloseAllowingRebalance will NOT send a leave group request. See InstanceID for more details.

CommitMarkedOffsets issues a synchronous offset commit for any partition that has been consumed from that has marked offsets. Retryable errors are retried up to the configured retry limit, and any unretryable error is returned.

This function is only useful if you have marked offsets with MarkCommitRecords when using AutoCommitMarks, otherwise this is a no-op.

The recommended pattern for using this function is to have a poll / process / commit loop. First PollFetches, then process every record, call MarkCommitRecords for the records you wish the commit and then call CommitMarkedOffsets.

As an alternative if you want to commit specific records, see CommitRecords.

CommitOffsets commits the given offsets for a group, calling onDone with the commit request and either the response or an error if the response was not issued. If uncommitted is empty or the client is not consuming as a group, onDone is called with (nil, nil, nil) and this function returns immediately. It is OK if onDone is nil, but you will not know if your commit succeeded.

This is an advanced function and is difficult to use correctly. For simpler, more easily understandable committing, see CommitRecords and CommitUncommittedOffsets.

This function itself does not wait for the commit to finish. By default, this function is an asynchronous commit. You can use onDone to make it sync. If autocommitting is enabled, this function blocks autocommitting until this function is complete and the onDone has returned.

It is invalid to use this function to commit offsets for a transaction.

Note that this function ensures absolute ordering of commit requests by canceling prior requests and ensuring they are done before executing a new one. This means, for absolute control, you can use this function to periodically commit async and then issue a final sync commit before quitting (this is the behavior of autocommiting and using the default revoke). This differs from the Java async commit, which does not retry requests to avoid trampling on future commits.

It is highly recommended to check the response's partition's error codes if the response is non-nil. While unlikely, individual partitions can error. This is most likely to happen if a commit occurs too late in a rebalance event.

Do not use this async CommitOffsets in OnPartitionsRevoked, instead use CommitOffsetsSync. If you commit async, the rebalance will proceed before this function executes, and you will commit offsets for partitions that have moved to a different consumer.

CommitOffsetsSync cancels any active CommitOffsets, begins a commit that cannot be canceled, and waits for that commit to complete. This function will not return until the commit is done and the onDone callback is complete.

The purpose of this function is for use in OnPartitionsRevoked or committing before leaving a group, because you do not want to have a commit issued in OnPartitionsRevoked canceled.

This is an advanced function, and for simpler, more easily understandable committing, see CommitRecords and CommitUncommittedOffsets.

For more information about committing and committing asynchronously, see CommitOffsets.

CommitRecords issues a synchronous offset commit for the offsets contained within rs. Retryable errors are retried up to the configured retry limit, and any unretryable error is returned.

This function is useful as a simple way to commit offsets if you have disabled autocommitting. As an alternative if you always want to commit everything, see CommitUncommittedOffsets.

Simple usage of this function may lead to duplicate records if a consumer group rebalance occurs before or while this function is being executed. You can avoid this scenario by calling CommitRecords in a custom OnPartitionsRevoked, but for most workloads, a small bit of potential duplicate processing is fine. See the documentation on DisableAutoCommit for more details. You can also avoid this problem by using BlockRebalanceOnPoll, but that option comes with its own tradeoffs (refer to its documentation).

It is recommended to always commit records in order (per partition). If you call this function twice with record for partition 0 at offset 999 initially, and then with record for partition 0 at offset 4, you will rewind your commit.

A use case for this function may be to partially process a batch of records, commit, and then continue to process the rest of the records. It is not recommended to call this for every record processed in a high throughput scenario, because you do not want to unnecessarily increase load on Kafka.

If you do not want to wait for this function to complete before continuing processing records, you can call this function in a goroutine.

CommitUncommittedOffsets issues a synchronous offset commit for any partition that has been consumed from that has uncommitted offsets. Retryable errors are retried up to the configured retry limit, and any unretryable error is returned.

The recommended pattern for using this function is to have a poll / process / commit loop. First PollFetches, then process every record, then call CommitUncommittedOffsets.

As an alternative if you want to commit specific records, see CommitRecords.

CommittedOffsets returns the latest committed offsets. Committed offsets are updated from commits or from joining a group and fetching offsets.

If there are no committed offsets, this returns nil.

Context returns the internal context used wherever possible in the client. By default this is context.WithCancel(context.Background()). You may override the background context with your own via WithContext. The context is occasionally wrapped further internally in client subsystems.

DiscoveredBrokers returns all brokers that were discovered from prior metadata responses. This does not actually issue a metadata request to load brokers; if you wish to ensure this returns all brokers, be sure to manually issue a metadata request before this. This also does not include seed brokers, which are internally saved under special internal broker IDs (but, it does include those brokers under their normal IDs as returned from a metadata response).

func (*Client) EndAndBeginTransaction added in v1.4.0

EndAndBeginTransaction is a combination of Flush, EndTransaction, and BeginTransaction. You cannot concurrently produce during this function.

The onEnd function is called with your input context and the result of Flush, or EndTransaction if Flush is successful. If onEnd returns an error, BeginTransaction is not called and this function returns the result of onEnd. Otherwise, this function returns the result of BeginTransaction. See the documentation on EndTransaction and BeginTransaction for further details. It is invalid to call this function more than once at a time, and it is invalid to call concurrent with EndTransaction or BeginTransaction.

This function used to serve more purpose, allowing you to produce concurrently while calling this and avoiding flushing, but the internal optimizations are no longer valid as of Kafka 4.0 due to KIP-890 changing some internal semantics.

EndTransaction ends a transaction and resets the client's internal state to not be in a transaction.

Flush and CommitOffsetsForTransaction must be called before this function; this function does not flush and does not itself ensure that all buffered records are flushed. If no record yet has caused a partition to be added to the transaction, this function does nothing and returns nil. Alternatively, AbortBufferedRecords should be called before aborting a transaction to ensure that any buffered records not yet flushed will not be a part of a new transaction.

If the producer ID has an error and you are trying to commit, this will return with kerr.OperationNotAttempted. If this happened, retry EndTransaction with TryAbort. If this returns kerr.TransactionAbortable, you can retry with TryAbort. You should not retry this function on any other error.

It may be possible for the client to recover in a new transaction via BeginTransaction if an error is returned from this function:

Note that canceling the context will likely leave the client in an undesirable state, because canceling the context may cancel the in-flight EndTransaction request, making it impossible to know whether the commit or abort was successful. It is recommended to not cancel the context.

EnsureProduceConnectionIsOpen attempts to open a produce connection to all specified brokers, or all brokers if `brokers` is empty or contains -1.

This can be used in an attempt to reduce the latency when producing if your application produces infrequently: you can force open a produce connection a bit earlier than you intend to produce, rather than at the moment you produce. In rare circumstances, it is possible that a connection that was ensured to be open may close before you produce.

This returns an errors.Join'd error that merges a message for all brokers that failed to be opened as well as why.

Flush hangs waiting for all buffered records to be flushed, stopping all lingers if necessary.

If the context finishes (Done), this returns the context's error.

This function is safe to call multiple times concurrently, and safe to call concurrent with Flush.

func (cl *Client) ForceMetadataRefresh()

ForceMetadataRefresh triggers the client to update the metadata that is currently used for producing & consuming.

Internally, the client already properly triggers metadata updates whenever a partition is discovered to be out of date (leader moved, epoch is old, etc). However, when partitions are added to a topic through a CreatePartitions request, it may take up to MetadataMaxAge for the new partitions to be discovered. In this case, you may want to forcefully refresh metadata manually to discover these new partitions sooner.

func (cl *Client) ForceRebalance()

ForceRebalance quits a group member's heartbeat loop so that the member rejoins with a JoinGroupRequest.

This function is only useful if you either (a) know that the group member is a leader, and want to force a rebalance for any particular reason, or (b) are using a custom group balancer, and have changed the metadata that will be returned from its JoinGroupMetadata method. This function has no other use; see KIP-568 for more details around this function's motivation.

If neither of the cases above are true (this member is not a leader, and the join group metadata has not changed), then Kafka will not actually trigger a rebalance and will instead reply to the member with its current assignment.

GetConsumeTopics retrives a list of current topics being consumed.

GroupMetadata returns the current group member ID and generation, or an empty string and -1 if not in the group.

func (cl *Client) LeaveGroup()

LeaveGroup leaves a group. Close automatically leaves the group, so this is only necessary to call if you plan to leave the group but continue to use the client. If a rebalance is in progress, this function waits for the rebalance to complete before the group can be left. This is necessary to allow you to safely issue one final offset commit in OnPartitionsRevoked. If you have overridden the default revoke, you must manually commit offsets before leaving the group.

If you have configured the group with an InstanceID, this does not leave the group. With instance IDs, it is expected that clients will restart and re-use the same instance ID. To leave a group using an instance ID, you must manually issue a kmsg.LeaveGroupRequest or use an external tool (kafka scripts or kcl).

It is recommended to use LeaveGroupContext to see if the leave was successful.

LeaveGroupContext leaves a group. Close automatically leaves the group, so this is only necessary to call if you plan to leave the group but continue to use the client. If a rebalance is in progress, this function waits for the rebalance to complete before the group can be left. This is necessary to allow you to safely issue one final offset commit in OnPartitionsRevoked. If you have overridden the default revoke, you must manually commit offsets before leaving the group.

The context can be used to avoid waiting for the client to leave the group. Not waiting may result in your client being stuck in the group and the partitions this client was consuming being stuck until the session timeout. This function returns any leave group error or context cancel error. If the context is nil, this immediately leaves the group and does not wait and does not return an error.

If you have configured the group with an InstanceID, this does not leave the group. With instance IDs, it is expected that clients will restart and re-use the same instance ID. To leave a group using an instance ID, you must manually issue a kmsg.LeaveGroupRequest or use an external tool (kafka scripts or kcl).

MarkCommitOffsets marks offsets to be available for autocommitting. This function is only useful if you use the AutoCommitMarks config option, see the documentation on that option for more details. This function does not allow rewinds.

MarkCommitRecords marks records to be available for autocommitting. This function is only useful if you use the AutoCommitMarks config option, see the documentation on that option for more details. This function does not allow rewinds.

MarkedOffsets returns the latest marked offsets. When autocommitting, a marked offset is an offset that can be committed, in comparison to a dirty offset that cannot yet be committed. MarkedOffsets returns nil if you are not using AutoCommitMarks.

OptValue returns the value for the given configuration option. If the given option does not exist, this returns nil. This function takes either a raw Opt, or an Opt function name.

If a configuration option has multiple inputs, this function returns only the first input. If the function is a boolean function (such as BlockRebalanceOnPoll), this function returns the value of the internal bool. Variadic option inputs are returned as a single slice. Options that are internally stored as a pointer (ClientID, TransactionalID, and InstanceID) are returned as their string input; you can see if the option is internally nil by looking at the second value returned from OptValues.

var (
	cl, _ := NewClient(
		InstanceID("foo"),
		ConsumeTopics("foo", "bar"),
	)
	iid    = cl.OptValue(InstanceID)           // iid is "foo"
	gid    = cl.OptValue(ConsumerGroup)        // gid is "" since groups are not used
	topics = cl.OptValue("ConsumeTopics")      // topics is []string{"foo", "bar"}; string lookup for the option works
	bpoll  = cl.OptValue(BlockRebalanceOnPoll) // bpoll is false
	t      = cl.OptValue(SessionTimeout)       // t is 45s, the internal default
	td     = t.(time.Duration)                 // safe conversion since SessionTimeout's input is a time.Duration
	unk    = cl.OptValue("Unknown"),           // unk is nil
)

OptValues returns all values for options. This method is useful for options that have multiple inputs (notably, SoftwareNameAndVersion). This is also useful for options that are internally stored as a pointer (ClientID, TransactionalID, and InstanceID) -- this function will return the string value of the option but also whether the option is non-nil. Boolean options are returned as a single-element slice with the bool value. Variadic inputs are returned as a signle slice. If the input option does not exist, this returns nil.

var (
	cl, _    = NewClient(
		InstanceID("foo"),
		ConsumeTopics("foo", "bar"),
	)
	idValues = cl.OptValues(InstanceID)           // idValues is []any{"foo", true}
	tValues  = cl.OptValues(SessionTimeout)       // tValues is []any{45 * time.Second}
	topics   = cl.OptValues(ConsumeTopics)        // topics is []any{[]string{"foo", "bar"}
	bpoll    = cl.OptValues(BlockRebalanceOnPoll) // bpoll is []any{false}
	unknown  = cl.OptValues("Unknown")            // unknown is nil
)

Opts returns the options that were used to create this client. This can be as a base to generate a new client, where you can add override options to the end of the original input list. If you want to know a specific option value, you can use OptValue or OptValues.

PartitionLeader returns the given topic partition's leader, leader epoch and load error. This returns -1, -1, nil if the partition has not been loaded.

PauseFetchPartitions sets the client to no longer fetch the given partitions and returns all currently paused partitions. Paused partitions persist until resumed. You can call this function with no partitions to simply receive the list of currently paused partitions.

Pausing individual partitions is independent from pausing topics with the PauseFetchTopics method. If you pause partitions for a topic with PauseFetchPartitions, and then pause that same topic with PauseFetchTopics, the individually paused partitions will not be unpaused if you only call ResumeFetchTopics.

PauseFetchTopics sets the client to no longer fetch the given topics and returns all currently paused topics. Paused topics persist until resumed. You can call this function with no topics to simply receive the list of currently paused topics.

Pausing topics is independent from pausing individual partitions with the PauseFetchPartitions method. If you pause partitions for a topic with PauseFetchPartitions, and then pause that same topic with PauseFetchTopics, the individually paused partitions will not be unpaused if you only call ResumeFetchTopics.

Ping returns whether any broker is reachable and that the client can communicate with it, iterating over any discovered broker or seed broker until one returns a successful response to a broker-only Metadata request. No discovered broker nor seed broker is attempted more than once. If all requests fail, this returns final error.

PollFetches waits for fetches to be available, returning as soon as any broker returns a fetch. If the context is nil, this function will return immediately with any currently buffered records.

If the client is closed, a fake fetch will be injected that has no topic, a partition of 0, and a partition error of ErrClientClosed. If the context is canceled, a fake fetch will be injected with ctx.Err. These injected errors can be used to break out of a poll loop.

It is important to check all partition errors in the returned fetches. If any partition has a fatal error and actually had no records, fake fetch will be injected with the error.

If you are group consuming, a rebalance can happen under the hood while you process the returned fetches. This can result in duplicate work, and you may accidentally commit to partitions that you no longer own. You can prevent this by using BlockRebalanceOnPoll, but this comes with different tradeoffs. See the documentation on BlockRebalanceOnPoll for more information.

PollRecords waits for records to be available, returning as soon as any broker returns records in a fetch. If the context is nil, this function will return immediately with any currently buffered records.

If the client is closed, a fake fetch will be injected that has no topic, a partition of -1, and a partition error of ErrClientClosed. If the context is canceled, a fake fetch will be injected with ctx.Err. These injected errors can be used to break out of a poll loop.

This returns a maximum of maxPollRecords total across all fetches, or returns all buffered records if maxPollRecords is <= 0.

It is important to check all partition errors in the returned fetches. If any partition has a fatal error and actually had no records, fake fetch will be injected with the error.

If you are group consuming, a rebalance can happen under the hood while you process the returned fetches. This can result in duplicate work, and you may accidentally commit to partitions that you no longer own. You can prevent this by using BlockRebalanceOnPoll, but this comes with different tradeoffs. See the documentation on BlockRebalanceOnPoll for more information.

Produce sends a Kafka record to the topic in the record's Topic field, calling an optional `promise` with the record and a potential error when Kafka replies. For a synchronous produce, see ProduceSync. Records are produced in order per partition if the record is produced successfully. Successfully produced records will have their attributes, offset, and partition set before the promise is called. All promises are called serially (and should be relatively fast). If a record's timestamp is unset, this sets the timestamp to time.Now.

If the topic field is empty, the client will use the DefaultProduceTopic; if that is also empty, the record is failed immediately. If the record is too large to fit in a batch on its own in a produce request, the record will be failed with immediately kerr.MessageTooLarge.

If the client is configured to automatically flush the client currently has the configured maximum amount of records buffered, Produce will block. The context can be used to cancel waiting while records flush to make space. In contrast, if flushing is configured, the record will be failed immediately with ErrMaxBuffered (this same behavior can be had with TryProduce).

Once a record is buffered into a batch, it can be canceled in three ways: canceling the context, the record timing out, or hitting the maximum retries. If any of these conditions are hit and it is currently safe to fail records, all buffered records for the relevant partition are failed. Only the first record's context in a batch is considered when determining whether the batch should be canceled. A record is not safe to fail if the client is idempotently producing and a request has been sent; in this case, the client cannot know if the broker actually processed the request (if so, then removing the records from the client will create errors the next time you produce).

If the client is transactional and a transaction has not been begun, the promise is immediately called with an error corresponding to not being in a transaction.

ProduceSync is a synchronous produce. See the Produce documentation for an in depth description of how producing works.

This function produces all records in one range loop and waits for them all to be produced before returning.

ProducerID returns, loading if necessary, the current producer ID and epoch. This returns an error if the producer ID could not be loaded, if the producer ID has fatally errored, or if the context is canceled.

PurgeTopicsFromClient internally removes all internal information about the input topics. If you want to purge information for only consuming or only producing, see the related functions [PurgeTopicsFromConsuming] and [PurgeTopicsFromProducing].

For producing, this clears all knowledge that these topics have ever been produced to. Producing to the topic again may result in out of order sequence number errors, or, if idempotency is disabled and the sequence numbers align, may result in invisibly discarded records at the broker. Purging a topic that was previously produced to may be useful to free up resources if you are producing to many disparate and short lived topic in the lifetime of this client and you do not plan to produce to the topic anymore. You may want to flush buffered records before purging if records for a topic you are purging are currently in flight.

For consuming, this removes all concept of the topic from being consumed. This is different from PauseFetchTopics, which literally pauses the fetching of topics but keeps the topic information around for resuming fetching later. Purging a topic that was being consumed can be useful if you know the topic no longer exists, or if you are consuming via regex and know that some previously consumed topics no longer exist, or if you simply do not want to ever consume from a topic again. If you are group consuming, this function will likely cause a rebalance. If you are consuming via regex and the topic still exists on the broker, this function will at most only temporarily remove the topic from the client and the topic will be re-discovered.

For admin requests, this deletes the topic from the cached metadata map for sharded requests. Metadata for sharded admin requests is only cached for MetadataMinAge anyway, but the map is not cleaned up one the metadata expires. This function ensures the map is purged.

func (cl *Client) PurgeTopicsFromConsuming(topics ...string)

PurgeTopicsFromConsuming internally removes all internal information for consuming about the input topics. This runs the consumer bit of logic that is documented in [PurgeTopicsFromClient]; see that function for more details.

func (cl *Client) PurgeTopicsFromProducing(topics ...string)

PurgeTopicsFromProducing internally removes all internal information for producing about the input topics. This runs the producer bit of logic that is documented in [PurgeTopicsFromClient]; see that function for more details.

RemoveConsumePartitions removes partitions from being consumed. This function works only for direct, non-regex consumers.

This method does not purge the concept of any topics from the client -- if you remove all partitions from a topic that was being consumed, metadata fetches will still occur for the topic. If you want to remove the topic entirely, use PurgeTopicsFromClient.

If you specified ConsumeTopics and this function removes all partitions for a topic, the topic will no longer be consumed.

Request issues a request to Kafka, waiting for and returning the response. If a retryable network error occurs, or if a retryable group / transaction coordinator error occurs, the request is retried. All other errors are returned.

If the request is an admin request, this will issue it to the Kafka controller. If the controller ID is unknown, this will attempt to fetch it. If the fetch errors, this will return an unknown controller error.

If the request is a group or transaction coordinator request, this will issue the request to the appropriate group or transaction coordinator.

For transaction requests, the request is issued to the transaction coordinator. However, if the request is an init producer ID request and the request has no transactional ID, the request goes to any broker.

Some requests need to be split and sent to many brokers. For these requests, it is *highly* recommended to use RequestSharded. Not all responses from many brokers can be cleanly merged. However, for the requests that are split, this does attempt to merge them in a sane way.

The following requests are split:

ListOffsets
OffsetFetch (if using v8+ for Kafka 3.0+)
FindCoordinator (if using v4+ for Kafka 3.0+)
DescribeGroups
ListGroups
DeleteRecords
OffsetForLeaderEpoch
DescribeConfigs
AlterConfigs
AlterReplicaLogDirs
DescribeLogDirs
DeleteGroups
IncrementalAlterConfigs
DescribeProducers
DescribeTransactions
ListTransactions

Kafka 3.0 introduced batch OffsetFetch and batch FindCoordinator requests. This function is forward and backward compatible: old requests will be batched as necessary, and batched requests will be split as necessary. It is recommended to always use batch requests for simplicity.

In short, this method tries to do the correct thing depending on what type of request is being issued.

The passed context can be used to cancel a request and return early. Note that if the request was written to Kafka but the context canceled before a response is received, Kafka may still operate on the received request.

If using this function to issue kmsg.ProduceRequest's, you must configure the client with the same RequiredAcks option that you use in the request. If you are issuing produce requests with 0 acks, you must configure the client with the same timeout you use in the request. The client will internally rewrite the incoming request's acks to match the client's configuration, and it will rewrite the timeout millis if the acks is 0. It is strongly recommended to not issue raw kmsg.ProduceRequest's.

RequestCachedMetadata returns a metadata response, using any cached topic data possible. Any topic with data cached longer than 'limit' has its metadata updated before being returned. If limit is zero or less, MetadataMinAge is used (default 5s).

This function is useful if you run a lot of functions that internally fetch metadata to execute. As an example, many functions in the kadm package all require metadata to run; those functions use cached metadata as much as possible.

This function does *not* return authorized operations, even if the request has IncludeClusterAuthorizedOperations or IncludeTopicAuthorizedOperations set to true. This function cannot be used to request topics via TopicID; the direct topic name must be used.

RequestSharded performs the same logic as Request, but returns all responses from any broker that the request was split to. This always returns at least one shard. If the request does not need to be issued (describing no groups), this issues the request to a random broker just to ensure that one shard exists.

There are only a few requests that are strongly recommended to explicitly use RequestSharded; the rest can by default use Request. These few requests are mentioned in the documentation for Request.

If, in the process of splitting a request, some topics or partitions are found to not exist, or Kafka replies that a request should go to a broker that does not exist, all those non-existent pieces are grouped into one request to the first seed broker. This will show up as a seed broker node ID (min int32) and the response will likely contain purely errors.

The response shards are ordered by broker metadata.

ResumeFetchPartitions resumes fetching the input partitions if they were previously paused. Resuming partitions that are not currently paused is a per-topic no-op. See the documentation on PauseFetchPartitions for more details.

ResumeFetchTopics resumes fetching the input topics if they were previously paused. Resuming topics that are not currently paused is a per-topic no-op. See the documentation on PauseFetchTopics for more details.

SeedBrokers returns the all seed brokers.

SetOffsets sets any matching offsets in setOffsets to the given epoch/offset. Partitions that are not specified are not set. It is invalid to set topics that were not yet returned from a PollFetches: this function sets only partitions that were previously consumed, any extra partitions are skipped.

If directly consuming, this function operates as expected given the caveats of the prior paragraph.

If using transactions, it is advised to just use a GroupTransactSession and avoid this function entirely.

If using group consuming, It is strongly recommended to use this function outside of the context of a PollFetches loop and only when you know the group is not revoked (i.e., block any concurrent revoke while issuing this call) and to not use this concurrent with committing. Any other usage is prone to odd interactions.

TryProduce is similar to Produce, but rather than blocking if the client currently has MaxBufferedRecords or MaxBufferedBytes buffered, this fails immediately with ErrMaxBuffered. See the Produce documentation for more details.

UncommittedOffsets returns the latest uncommitted offsets. Uncommitted offsets are always updated on calls to PollFetches.

If there are no uncommitted offsets, this returns nil.

func (cl *Client) UnsafeAbortBufferedRecords()

UnsafeAbortBufferedRecords fails all unflushed records with ErrAborted and waits for there to be no buffered records. This function does NOT wait for any inflight produce requests to finish, meaning topics in the client may be in an invalid state and producing to an invalid-state topic may cause the client to enter a fatal failed state. If you want to produce to topics that were unsafely aborted, it is recommended to use PurgeTopicsFromClient to forcefully reset the topics before producing to them again.

When producing with idempotency enabled or with transactions, every record has a sequence number. The client must wait for inflight requests to have responses before failing a record, otherwise the client cannot know if a sequence number was seen by the broker and tracked or not seen by the broker and not tracked. By unsafely aborting, the client forcefully abandons all records, and producing to the topics again may re-use a sequence number and cause internal errors.

func (cl *Client) UpdateFetchMaxBytes(maxBytes, maxPartBytes int32)

UpdateFetchMaxBytes updates the max bytes that a fetch request will ask for and the max partition bytes that a fetch request will ask for each partition.

UpdateSeedBrokers updates the client's list of seed brokers. Over the course of a long period of time, your might replace all brokers that you originally specified as seeds. This command allows you to replace the client's list of seeds.

This returns an error if any of the input addrs is not a host:port. If the input list is empty, the function returns without replacing the seeds.

CompressFlag is a flag to instruct the compressor.

type CompressionCodec struct {
	
}

CompressionCodec configures how records are compressed before being sent.

Records are compressed within individual topics and partitions, inside of a RecordBatch. All records in a RecordBatch are compressed into one record for that batch.

GzipCompression enables gzip compression with the default compression level.

Lz4Compression enables lz4 compression with the fastest compression level.

NoCompression is a compression option that avoids compression. This can always be used as a fallback compression.

SnappyCompression enables snappy compression.

ZstdCompression enables zstd compression with the default compression level.

WithLevel changes the compression codec's "level", effectively allowing for higher or lower compression ratios at the expense of CPU speed.

For the zstd package, the level is a typed int; simply convert the type back to an int for this function.

If the level is invalid, compressors just use a default level.

type CompressionCodecType int8

CompressionCodecType is a bitfield specifying a Kafka-defined compression codec. Per spec, only four compression codecs are supported. However, if you control both the producer and consumer, you can technically override the codec to anything.

Compressor is an interface that defines how produce batches are compressed. You can override the default client internal compressor for more control over what compressors to use, level, and memory reuse.

DefaultCompressor returns the default client compressor. The returned compressor will compress produce batches in preference-order of the specified codecs. Usually, you only need to specify one codec. If you are speaking to an old broker that may not support zstd, you may need to specify a second compressor as fallback (old Kafka did not support zstd). If no codecs are specified, or the specified codec is CodecNone, this returns 'nil, nil'. A compressor is only used within the client if it is non-nil.

type ConsumerBalancer struct {
	
}

ConsumerBalancer is a helper type for writing balance plans that use the "consumer" protocol, such that each member uses a kmsg.ConsumerMemberMetadata in its join group request.

NewConsumerBalancer parses the each member's metadata as a kmsg.ConsumerMemberMetadata and returns a ConsumerBalancer to use in balancing.

If any metadata parsing fails, this returns an error.

Balance satisfies the GroupMemberBalancer interface, but is never called because GroupMemberBalancerOrError exists.

BalanceOrError satisfies the GroupMemberBalancerOrError interface.

EachMember calls fn for each member and its corresponding metadata in the consumer group being balanced.

MemberAt returns the nth member and its corresponding metadata.

MemberTopics returns the unique set of topics that all members are interested in.

This can safely be called if the balancer is nil; if so, this will return nil.

Members returns the list of input members for this group balancer.

NewPlan returns a type that can be used to build a balance plan. The return satisfies the IntoSyncAssignment interface.

SetError allows you to set any error that occurred while balancing. This allows you to fail balancing and return nil from Balance.

ConsumerBalancerBalance is what the ConsumerBalancer invokes to balance a group.

This is a complicated interface, but in short, this interface has one function that implements the actual balancing logic: using the input balancer, balance the input topics and partitions. If your balancing can fail, you can use ConsumerBalancer.SetError(...) to return an error from balancing, and then you can simply return nil from Balance.

type ConsumerOpt interface {
	Opt
	
}

ConsumerOpt is a consumer specific option to configure a client. This is simply a namespaced Opt.

ConsumePartitions sets partitions to consume from directly and the offsets to start consuming those partitions from.

This option is basically a way to explicitly consume from subsets of partitions in topics, or to consume at exact offsets. Offsets from this option have higher precedence than the ConsumeResetOffset.

This option is not compatible with group consuming and regex consuming. If you want to assign partitions directly, but still use Kafka to commit offsets, check out the kadm package's FetchOffsets and CommitOffsets methods. These will allow you to commit as a group outside the context of a Kafka group.

ConsumePreferringLagFn allows you to re-order partitions before they are fetched, given each partition's current lag.

By default, the client rotates partitions fetched by one after every fetch request. Kafka answers fetch requests in the order that partitions are requested, filling the fetch response until FetchMaxBytes and FetchMaxPartitionBytes are hit. All partitions eventually rotate to the front, ensuring no partition is starved.

With this option, you can return topic order and per-topic partition ordering. These orders will sort to the front (first by topic, then by partition). Any topic or partitions that you do not return are added to the end, preserving their original ordering.

For a simple lag preference that sorts the laggiest topics and partitions first, use `kgo.ConsumePreferringLagFn(kgo.PreferLagAt(50))` (or some other similar lag number).

ConsumeRegex sets the client to parse all topics passed to ConsumeTopics as regular expressions.

When consuming via regex, every metadata request loads *all* topics, so that all topics can be passed to any regular expressions. Every topic is evaluated only once ever across all regular expressions; either it permanently is known to match, or is permanently known to not match.

ConsumeResetOffset sets the offset to reset to if the client ever sees OffsetOutOfRange while fetching. If you do not set ConsumeStartOffset, this is also the offset to start consuming from when consuming a partition for the first time. The default is NewOffset().AtStart(), i.e., reset to the earliest offset. If using this option, it is strongly recommended to also set ConsumeStartOffset.

This option is *only* used if a consumer seeds OffsetOutOfRange on the *first* fetch of a partition. If the consumer has consumed the partition at all and sees the error, it will automatically reset to the first offset after the timestamp of the last successfully consumed offset. If data loss occurred such that even the last successfully consumed offset is lost, the client automatically resets to the new current end offset. If you want to disable offset resetting entirely, you can use NoResetOffset.

If you use an exact or relative offsets and the offset ends up out of range, the client chooses the nearest of either the log start offset or the log end offset. For example, using At(3) when the partition starts at 8 results in the partition being consumed from offset 8.

The following determines the offset for when a partition is seen for the first time, or reset while fetching:

at start?                         => reset to the log start offset
at end?                           => reset to the log end offset
at exact?                         => reset to the an exact offset (3 means offset 3)
relative?                         => reset to the the above, + / - the relative amount
exact/relative are out of bounds? => reset to the nearest boundary (start or end)
after millisec?                   => reset to the first offset after millisec if one exists, else the log end offset

To match Kafka's auto.offset.reset,

NewOffset().AtStart()     == auto.offset.reset "earliest"
NewOffset().AtEnd()       == auto.offset.reset "latest"
NewOffset().AtCommitted() == auto.offset.reset "none"

With the above, make sure to use NoResetOffset if you want to stop consuming when you encounter OffsetOutOfRange. It is highly recommended to read the docs for all Offset methods.

Be sure to check the documentation for ConsumeStartOffset, especially if you rely on this option as the start offset as well.

ConsumeStartOffset sets the offset to start consuming from when consuming a partition for the first time. If you do not set ConsumeResetOffset, this is also the offset to reset to if the client sees an OffsetOutOfRange error while consuming a partition. The default is NewOffset().AtStart(), i.e., start processing a partition from the earliest offset. If using this option, it is strongly recommended to also set ConsumeResetOffset.

If you use an exact or relative offsets and the offset ends up out of range, the client chooses the nearest of either the log start offset or the log end offset. For example, using At(3) when the partition starts at 8 results in the partition being consumed from offset 8.

For group consuming, you can use Offset.AtCommitted to prevent starting consuming a partition in a group if the partition has no prior commits.

The following determines the offset for when a partition is seen for the first time:

at start?                         => start at the log start offset
at end?                           => start at the log end offset
at exact?                         => start at the an exact offset (3 means offset 3)
relative?                         => start at the the above, + / - the relative amount
exact/relative are out of bounds? => start at the nearest boundary (start or end)
after millisec?                   => start at first offset after millisec if one exists, else log end offset

To match Kafka's auto.offset.reset which is used for both the start offset and the reset offset,

NewOffset().AtStart()     == auto.offset.reset "earliest"
NewOffset().AtEnd()       == auto.offset.reset "latest"
NewOffset().AtCommitted() == auto.offset.reset "none"

Be sure to check the documentation for ConsumeResetOffset, especially if you rely on this option as the reset offset as well.

ConsumeTopics adds topics to use for consuming.

By default, consuming will start at the beginning of partitions. To change this, use the ConsumeResetOffset option.

DisableFetchCRCValidation disables crc32 checksum validation when fetching. This should only be used if you are working with a broker that does not properly support CRCs in record batches.

DisableFetchSessions sets the client to not use fetch sessions (Kafka 1.0+).

A "fetch session" is a way to reduce bandwidth for fetch requests & responses, and to potentially reduce the amount of work that brokers have to do to handle fetch requests. A fetch session opts into the broker tracking some state of what the client is interested in. For example, say that you are interested in thousands of topics, and most of these topics are receiving data only rarely. A fetch session allows the client to register that it is interested in those thousands of topics on the first request. On future requests, if the offsets for these topics have not changed, those topics will be elided from the request. The broker knows to reply with the extra topics if any new data is available, otherwise the topics are also elided from the response. This massively reduces the amount of information that needs to be included in requests or responses.

Using fetch sessions means more state is stored on brokers. Maintaining this state eats some memory. If you have thousands of consumers, you may not want fetch sessions to be used for everything. Brokers intelligently handle this by not creating sessions if they are at their configured limit, but you may consider disabling sessions if they are generally not useful to you. Brokers have metrics for the number of fetch sessions active, so you can monitor that to determine whether enabling or disabling sessions is beneficial or not.

For more details on fetch sessions, see KIP-227.

FetchIsolationLevel sets the "isolation level" used for fetching records, overriding the default ReadUncommitted.

FetchMaxBytes sets the maximum amount of bytes a broker will try to send during a fetch, overriding the default 50MiB. Note that brokers may not obey this limit if it has records larger than this limit. Also note that this client sends a fetch to each broker concurrently, meaning the client will buffer up to <brokers * max bytes> worth of memory.

This corresponds to the Java fetch.max.bytes setting.

If bumping this, consider bumping BrokerMaxReadBytes.

If what you are consuming is compressed, and compressed well, it is strongly recommended to set this option so that decompression does not eat all of your RAM.

FetchMaxPartitionBytes sets the maximum amount of bytes that will be consumed for a single partition in a fetch request, overriding the default 1MiB. Note that if a single batch is larger than this number, that batch will still be returned so the client can make progress.

This corresponds to the Java max.partition.fetch.bytes setting.

FetchMaxWait sets the maximum amount of time a broker will wait for a fetch response to hit the minimum number of required bytes before returning, overriding the default 5s.

This corresponds to the Java fetch.max.wait.ms setting.

FetchMinBytes sets the minimum amount of bytes a broker will try to send during a fetch, overriding the default 1 byte.

With the default of 1, data is sent as soon as it is available. By bumping this, the broker will try to wait for more data, which may improve server throughput at the expense of added latency.

This corresponds to the Java fetch.min.bytes setting.

KeepControlRecords sets the client to keep control messages and return them with fetches, overriding the default that discards them.

Generally, control messages are not useful.

KeepRetryableFetchErrors switches the client to always return any retryable broker error when fetching, rather than stripping them. By default, the client strips retryable errors from fetch responses; these are usually signals that a client needs to update its metadata to learn of where a partition has moved to (from one broker to another), or they are signals that one broker is temporarily unhealthy (broker not available). You can opt into keeping these errors if you want to specifically react to certain events. For example, if you want to react to you yourself deleting a topic, you can watch for either UNKNOWN_TOPIC_OR_PARTITION or UNKNOWN_TOPIC_ID errors being returned in fetches (and ignore the other errors).

MaxConcurrentFetches sets the maximum number of fetch requests to allow in flight or buffered at once, overriding the unbounded (i.e. number of brokers) default.

This setting, paired with FetchMaxBytes, can upper bound the maximum amount of memory that the client can use for consuming.

Requests are issued to brokers in a FIFO order: once the client is ready to issue a request to a broker, it registers that request and issues it in order with other registrations.

If Kafka replies with any data, the client does not track the fetch as completed until the user has polled the buffered fetch. Thus, a concurrent fetch is not considered complete until all data from it is done being processed and out of the client itself.

Note that brokers are allowed to hang for up to FetchMaxWait before replying to a request, so if this option is too constrained and you are consuming a low throughput topic, the client may take a long time before requesting a broker that has new data. For high throughput topics, or if the allowed concurrent fetches is large enough, this should not be a concern.

A value of 0 implies the allowed concurrency is unbounded and will be limited only by the number of brokers in the cluster.

Rack specifies where the client is physically located and changes fetch requests to consume from the closest replica as opposed to the leader replica.

Consuming from a preferred replica can increase latency but can decrease cross datacenter costs. See KIP-392 for more information.

RecheckPreferredReplicaInterval configures how long the consumer should fetch from a preferred replica before switching back to the leader. Periodically switching back to the leader allows the leader to re-choose a perhaps better preferred replica (say you added a new cluster, or added nodes to an existing cluster, or something else changed). For implementation simplicity, the interval is checked after fetch responses, meaning one more request can be issued after the interval has elapsed.

The default interval is 30 minutes.

WithDecompressor allows you to completely control how fetch batches are decompressed, allowing you to use alternative libraries than what franz-go supports, allowing you to have more control over memory & pooling, and other benefits. The client default compressor is the DefaultDecompressor.

Decompressor is an interface that defines how fetch batches are decompressed. You can override the default client internal decompressor for more control over what decompressors to use and memory reuse.

DefaultDecompressor returns the default decompressor used by clients. The first pool provided that implements PoolDecompressBytes will be used where possible.

type EndBeginTxnHow uint8

EndBeginTxnHow controls the safety of how EndAndBeginTransaction executes.

type EpochOffset struct {
	
	
	
	
	
	Epoch int32

	
	
	
	Offset int64
}

EpochOffset combines a record offset with the leader epoch the broker was at when the record was written.

Less returns whether the this EpochOffset is less than another. This is less than the other if this one's epoch is less, or the epoch's are equal and this one's offset is less.

ErrDataLoss is returned for Kafka >=2.1 when data loss is detected and the client is able to reset to the last valid offset.

type ErrFirstReadEOF struct {
	
}

ErrFirstReadEOF is returned for responses that immediately error with io.EOF. This is the client's guess as to why a read from a broker is failing with io.EOF. Two cases are currently handled,

There may be other reasons that an immediate io.EOF is encountered (perhaps the connection truly was severed before a response was received), but this error can help you quickly check common problems.

Unwrap returns io.EOF (or, if a custom dialer returned a wrapped io.EOF, this returns the custom dialer's wrapped error).

type ErrGroupSession struct {
	Err error
}

ErrGroupSession is injected into a poll if an error occurred such that your consumer group member was kicked from the group or was never able to join the group.

Fetch is an individual response from a broker.

type FetchBatchMetrics struct {
	
	
	
	
	
	
	NumRecords int

	
	
	
	
	
	
	
	
	
	
	
	UncompressedBytes int

	
	
	
	
	
	
	
	
	CompressedBytes int

	
	
	
	
	
	CompressionType uint8
}

FetchBatchMetrics tracks information about fetches of batches.

FetchError is an error in a fetch along with the topic and partition that the error was on.

FetchPartition is a response for a partition in a fetched topic from a broker.

ProcessFetchPartition processes all records in all batches or message sets in a *kmsg.FetchResponseTopicPartition, returning the processed FetchPartition and the offset of the last record that was processed. If hooks is non-nil, it is called with the metrics from processing this batch.

This function is useful when issuing manual Fetch requests for records or in any scenario where you want to process raw fetch responses.

EachRecord calls fn for each record in the partition.

FetchTopic is a response for a fetched topic from a broker.

EachPartition calls fn for each partition in Fetches.

EachRecord calls fn for each record in the topic, in any partition order.

Records returns all records in all partitions in this topic.

This is a convenience function that does a single slice allocation. If you can process records individually, it is far more efficient to use the Each functions.

FetchTopicPartition is similar to FetchTopic, but for an individual partition.

EachRecord calls fn for each record in the topic's partition.

Fetches is a group of fetches from brokers.

NewErrFetch returns a fake fetch containing a single empty topic with a single zero partition with the given error.

EachError calls fn for every partition that had a fetch error with the topic, partition, and error.

This function has the same semantics as the Errors function; refer to the documentation on that function for what types of errors are possible.

EachPartition calls fn for each partition in Fetches.

Partitions are not visited in any specific order, and a topic may be visited multiple times if it is spread across fetches.

EachRecord calls fn for each record in Fetches.

This is very similar to using a record iter, and is solely a convenience function depending on which style you prefer.

EachTopic calls fn for each topic in Fetches.

This is a convenience function that groups all partitions for the same topic from many fetches into one FetchTopic. A map is internally allocated to group partitions per topic before calling fn.

Empty checks whether the fetch result empty. This method is faster than NumRecords() == 0.

Err returns the first error in all fetches, if any. This can be used to quickly check if the client is closed or your poll context was canceled, or to check if there's some other error that requires deeper investigation with EachError. This function performs a linear scan over all fetched partitions. It is recommended to always check all errors. If you would like to more quickly check ahead of time if a poll was canceled because of closing the client or canceling the context, you can use Err0.

Err0 returns the error at the 0th index fetch, topic, and partition. This can be used to quickly check if polling returned early because the client was closed or the context was canceled and is faster than performing a linear scan over all partitions with Err. When the client is closed or the context is canceled, fetches will contain only one partition whose Err field indicates the close / cancel. Note that this returns whatever the first error is, nil or non-nil, and does not check for a specific error value.

Errors returns all errors in a fetch with the topic and partition that errored.

There are a few classes of errors possible:

  1. a normal kerr.Error; these are usually the non-retryable kerr.Errors, but theoretically a non-retryable error can be fixed at runtime (auth error? fix auth). It is worth restarting the client for these errors if you do not intend to fix this problem at runtime.

  2. an injected *ErrDataLoss; these are informational, the client automatically resets consuming to where it should and resumes. This error is worth logging and investigating, but not worth restarting the client for.

  3. an untyped batch parse failure; these are usually unrecoverable by restarts, and it may be best to just let the client continue. Restarting is an option, but you may need to manually repair your partition.

  4. an injected ErrClientClosed; this is a fatal informational error that is returned from every Poll call if the client has been closed. A corresponding helper function IsClientClosed can be used to detect this error.

  5. an injected context error; this can be present if the context you were using for polling timed out or was canceled.

  6. an injected ErrGroupSession; this is an informational error that is injected once a group session is lost in a way that is not the standard rebalance. This error can signify that your consumer member is not able to connect to the group (ACL problems, unreachable broker), or you blocked rebalancing for too long, or your callbacks took too long.

This list may grow over time.

IsClientClosed returns whether the fetches include an error indicating that the client is closed.

This function is useful to break out of a poll loop; you likely want to call this function before calling Errors. If you may cancel the context to poll, you may want to use Err0 and manually check errors.Is(ErrClientClosed) or errors.Is(context.Canceled).

NumRecords returns the total number of records across all fetched partitions.

RecordIter returns an iterator over all records in a fetch.

Note that errors should be inspected as well.

Records returns all records in all fetches.

This is a convenience function that does a single slice allocation. If you can process records individually, it is far more efficient to use the Each functions or the RecordIter.

type FetchesRecordIter struct {
	
}

FetchesRecordIter iterates over records in a fetch.

Done returns whether there are any more records to iterate over.

Next returns the next record from a fetch.

type FirstErrPromise struct {
	
}

FirstErrPromise is a helper type to capture only the first failing error when producing a batch of records with this type's Promise function.

This is useful for when you only care about any record failing, and can use that as a signal (i.e., to abort a batch). The AbortingFirstErrPromise function can be used to abort all records as soon as the first error is encountered. If you do not need to abort, you can use this type with no constructor.

This is similar to using ProduceResult's FirstErr function.

AbortingFirstErrPromise returns a FirstErrPromise that will call the client's AbortBufferedRecords function if an error is encountered.

This can be used to quickly exit when any error is encountered, rather than waiting while flushing only to discover things errored.

Err waits for all promises to complete and then returns any stored error.

Promise returns a promise for producing that will store the first error encountered.

The returned promise must eventually be called, because a FirstErrPromise does not return from 'Err' until all promises are completed.

GroupBalancer balances topics and partitions among group members.

A GroupBalancer is roughly equivalent to Kafka's PartitionAssignor.

CooperativeStickyBalancer performs the sticky balancing strategy, but additionally opts the consumer group into "cooperative" rebalancing.

Cooperative rebalancing differs from "eager" (the original) rebalancing in that group members do not stop processing partitions during the rebalance. Instead, once they receive their new assignment, each member determines which partitions it needs to revoke. If any, they send a new join request (before syncing), and the process starts over. This should ultimately end up in only two join rounds, with the major benefit being that processing never needs to stop.

NOTE once a group is collectively using cooperative balancing, it is unsafe to have a member join the group that does not support cooperative balancing. If the only-eager member is elected leader, it will not know of the new multiple join strategy and things will go awry. Thus, once a group is entirely on cooperative rebalancing, it cannot go back.

Migrating an eager group to cooperative balancing requires two rolling bounce deploys. The first deploy should add the cooperative-sticky strategy as an option (that is, each member goes from using one balance strategy to two). During this deploy, Kafka will tell leaders to continue using the old eager strategy, since the old eager strategy is the only one in common among all members. The second rolling deploy removes the old eager strategy. At this point, Kafka will tell the leader to use cooperative-sticky balancing. During this roll, all members in the group that still have both strategies continue to be eager and give up all of their partitions every rebalance. However, once a member only has cooperative-sticky, it can begin using this new strategy and things will work correctly. See KIP-429 for more details.

RangeBalancer returns a group balancer that, per topic, maps partitions to group members. Since this works on a topic level, uneven partitions per topic to the number of members can lead to slight partition consumption disparities.

Suppose there are two members M0 and M1, two topics t0 and t1, and each topic has three partitions p0, p1, and p2. The partition balancing will be

M0: [t0p0, t0p1, t1p0, t1p1]
M1: [t0p2, t1p2]

This is equivalent to the Java range balancer.

RoundRobinBalancer returns a group balancer that evenly maps topics and partitions to group members.

Suppose there are two members M0 and M1, two topics t0 and t1, and each topic has three partitions p0, p1, and p2. The partition balancing will be

M0: [t0p0, t0p2, t1p1]
M1: [t0p1, t1p0, t1p2]

If all members subscribe to all topics equally, the roundrobin balancer will give a perfect balance. However, if topic subscriptions are quite unequal, the roundrobin balancer may lead to a bad balance. See KIP-49 for one example (note that the fair strategy mentioned in KIP-49 does not exist).

This is equivalent to the Java roundrobin balancer.

StickyBalancer returns a group balancer that ensures minimal partition movement on group changes while also ensuring optimal balancing.

Suppose there are three members M0, M1, and M2, and two topics t0 and t1 each with three partitions p0, p1, and p2. If the initial balance plan looks like

M0: [t0p0, t0p1, t0p2]
M1: [t1p0, t1p1, t1p2]
M2: [t2p0, t2p2, t2p2]

If M2 disappears, both roundrobin and range would have mostly destructive reassignments.

Range would result in

M0: [t0p0, t0p1, t1p0, t1p1, t2p0, t2p1]
M1: [t0p2, t1p2, t2p2]

which is imbalanced and has 3 partitions move from members that did not need to move (t0p2, t1p0, t1p1).

RoundRobin would result in

M0: [t0p0, t0p2, t1p1, t2p0, t2p2]
M1: [t0p1, t1p0, t1p2, t2p1]

which is balanced, but has 2 partitions move when they do not need to (t0p1, t1p1).

Sticky balancing results in

M0: [t0p0, t0p1, t0p2, t2p0, t2p2]
M1: [t1p0, t1p1, t1p2, t2p1]

which is balanced and does not cause any unnecessary partition movement. The actual t2 partitions may not be in that exact combination, but they will be balanced.

An advantage of the sticky consumer is that it allows API users to potentially avoid some cleanup until after the consumer knows which partitions it is losing when it gets its new assignment. Users can then only cleanup state for partitions that changed, which will be minimal (see KIP-54; this client also includes the KIP-351 bugfix).

Note that this API implements the sticky partitioning quite differently from the Java implementation. The Java implementation is difficult to reason about and has many edge cases that result in non-optimal balancing (albeit, you likely have to be trying to hit those edge cases). This API uses a different algorithm to ensure optimal balancing while being an order of magnitude faster.

Since the new strategy is a strict improvement over the Java strategy, it is entirely compatible. Any Go client sharing a group with a Java client will not have its decisions undone on leadership change from a Go consumer to a Java one. Java balancers do not apply the strategy it comes up with if it deems the balance score equal to or worse than the original score (the score being effectively equal to the standard deviation of the mean number of assigned partitions). This Go sticky balancer is optimal and extra sticky. Thus, the Java balancer will never back out of a strategy from this balancer.

GroupMemberBalancer balances topics amongst group members. If your balancing can fail, you can implement GroupMemberBalancerOrError.

GroupMemberBalancerOrError is an optional extension interface for GroupMemberBalancer. This can be implemented if your balance function can fail.

For interface purposes, it is required to implement GroupMemberBalancer, but Balance will never be called.

type GroupOpt interface {
	Opt
	
}

GroupOpt is a consumer group specific option to configure a client. This is simply a namespaced Opt.

AdjustFetchOffsetsFn sets the function to be called when a group is joined after offsets are fetched so that a user can adjust offsets before consumption begins.

This function should not exceed the rebalance interval. It is possible for the group, immediately after finishing a balance, to re-enter a new balancing session. This function is passed a context that is canceled if the current group session finishes (i.e., after revoking).

If you are resetting the position of the offset, you may want to clear any existing "epoch" with WithEpoch(-1). If the epoch is non-negative, the client performs data loss detection, which may result in errors and unexpected behavior.

This function is called after OnPartitionsAssigned and may be called before or after OnPartitionsRevoked.

AutoCommitCallback sets the callback to use if autocommitting is enabled. This overrides the default callback that logs errors and continues.

AutoCommitInterval sets how long to go between autocommits, overriding the default 5s.

AutoCommitMarks switches the autocommitting behavior to only commit "marked" records, which can be done with the MarkCommitRecords method.

This option is basically a halfway point between autocommitting and manually committing. If you have slow batch processing of polls, then you can manually mark records to be autocommitted before you poll again. This way, if you usually take a long time between polls, your partial work can still be automatically checkpointed through autocommitting.

Balancers sets the group balancers to use for dividing topic partitions among group members, overriding the current default [cooperative-sticky]. This option is equivalent to Kafka's partition.assignment.strategies option.

For balancing, Kafka chooses the first protocol that all group members agree to support.

Note that if you opt into cooperative-sticky rebalancing, cooperative group balancing is incompatible with eager (classical) rebalancing and requires a careful rollout strategy (see KIP-429).

BlockRebalanceOnPoll switches the client to block rebalances whenever you poll until you explicitly call AllowRebalance. This option also ensures that any OnPartitions{Assigned,Revoked,Lost} callbacks are only called when you allow rebalances; they cannot be called if you have polled and are processing records.

By default, a consumer group is managed completely independently of consuming. A rebalance may occur at any moment. If you poll records, and then a rebalance happens, and then you commit, you may be committing to partitions you no longer own. This will result in duplicates. In the worst case, you could rewind commits that a different member has already made (risking duplicates if another rebalance were to happen before that other member commits again).

By blocking rebalancing after you poll until you call AllowRebalances, you can be sure that you commit records that your member currently owns. However, the big tradeoff is that by blocking rebalances, you put your group member at risk of waiting so long that the group member is kicked from the group because it exceeded the rebalance timeout. To compare clients, Sarama takes the default choice of blocking rebalancing; this option makes kgo more similar to Sarama.

If you use this option, you should ensure that you always process records quickly, and that your OnPartitions{Assigned,Revoked,Lost} callbacks are fast. It is recommended you also use PollRecords rather than PollFetches so that you can bound how many records you process at once. You must always AllowRebalances when you are done processing the records you received. Only rebalances that lose partitions are blocked; rebalances that are strictly net additions or non-modifications do not block (the On callbacks are always blocked so that you can ensure their serialization).

This function can largely replace any commit logic you may want to do in OnPartitionsRevoked.

Lastly, note that this actually blocks any rebalance from calling OnPartitions{Assigned,Revoked,Lost}. If you are using a cooperative rebalancer such as CooperativeSticky, a rebalance can begin right before you poll, and you will still receive records because no partitions are lost yet. The in-progress rebalance only blocks if you are assigned new partitions or if any of your partitions are revoked.

ConsumerGroup sets the consumer group for the client to join and consume in. This option is required if using any other group options.

Note that when group consuming, the default is to autocommit every 5s. To be safe, autocommitting only commits what is *previously* polled. If you poll once, nothing will be committed. If you poll again, the first poll is available to be committed. This ensures at-least-once processing, but does mean there is likely some duplicate processing during rebalances. When your client shuts down, you should issue one final synchronous commit before leaving the group (because you will not be polling again, and you are not waiting for an autocommit).

DisableAutoCommit disable auto committing.

If you disable autocommitting, you may want to use a custom OnPartitionsRevoked, otherwise you may end up doubly processing records (which is fine, just leads to duplicate processing). Consider the scenario: you, member A, are processing partition 0, and previously committed offset 4 and have now locally processed through offset 30. A rebalance happens, and partition 0 moves to member B. If you use OnPartitionsRevoked, you can detect that you are losing this partition and commit your work through offset 30, so that member B can start processing at offset 30. If you do not commit (i.e. you do not use a custom OnPartitionsRevoked), the other member will start processing at offset 4. It may process through offset 50, leading to double processing of offsets 4 through 29. Worse, you, member A, can rewind member B's commit, because member B may commit offset 50 and you may finally eventually commit offset 30. If a rebalance happens, then even more duplicate processing will occur of offsets 30 through 49.

Again, OnPartitionsRevoked is not necessary, and not using it just means double processing, which for most workloads is fine since a simple group consumer is not EOS / transactional, only at-least-once. But, this is something to be aware of.

GreedyAutoCommit opts into committing everything that has been polled when autocommitting (the dirty offsets), rather than committing what has previously been polled. This option may result in message loss if your application crashes.

GroupProtocol sets the group's join protocol, overriding the default value "consumer". The only reason to override this is if you are implementing custom join and sync group logic.

HeartbeatInterval sets how long a group member goes between heartbeats to Kafka, overriding the default 3,000ms.

Kafka uses heartbeats to ensure that a group member's session stays active. This value can be any value lower than the session timeout, but should be no higher than 1/3rd the session timeout.

This corresponds to Kafka's heartbeat.interval.ms.

InstanceID sets the group consumer's instance ID, switching the group member from "dynamic" to "static".

Prior to Kafka 2.3, joining a group gave a group member a new member ID. The group leader could not tell if this was a rejoining member. Thus, any join caused the group to rebalance.

Kafka 2.3 introduced the concept of an instance ID, which can persist across restarts. This allows for avoiding many costly rebalances and allows for stickier rebalancing for rejoining members (since the ID for balancing stays the same). The main downsides are that you, the user of a client, have to manage instance IDs properly, and that it may take longer to rebalance in the event that a client legitimately dies.

When using an instance ID, the client does NOT send a leave group request when closing. This allows for the client to restart with the same instance ID and rejoin the group to avoid a rebalance. It is strongly recommended to increase the session timeout enough to allow time for the restart (remember that the default session timeout is 10s).

To actually leave the group, you must use an external admin command that issues a leave group request on behalf of this instance ID (see kcl), or you can manually use the kmsg package with a proper LeaveGroupRequest.

NOTE: Leaving a group with an instance ID is only supported in Kafka 2.4+.

NOTE: If you restart a consumer group leader that is using an instance ID, it will not cause a rebalance even if you change which topics the leader is consuming. If your cluster is 3.2+, this client internally works around this limitation and you do not need to trigger a rebalance manually.

OnOffsetsFetched sets a function to be called after offsets have been fetched after a group has been balanced. This function is meant to allow users to inspect offset commit metadata. An error can be returned to exit this group session and exit back to join group.

This function should not exceed the rebalance interval. It is possible for the group, immediately after finishing a balance, to re-enter a new balancing session. This function is passed a context that is canceled if the current group session finishes (i.e., after revoking).

This function is called after OnPartitionsAssigned and may be called before or after OnPartitionsRevoked.

OnPartitionsAssigned sets the function to be called when a group is joined after partitions are assigned before fetches for those partitions begin.

This function, combined with OnPartitionsRevoked, should not exceed the rebalance interval. It is possible for the group to re-enter a new balancing session immediately after finishing a balance.

This function is passed the client's context, which is only canceled if the client is closed.

This function is not called concurrent with any other OnPartitions callback, and this function is given a new map that the user is free to modify.

This function can be called at any time you are polling or processing records. If you want to ensure this function is called serially with processing, consider the BlockRebalanceOnPoll option.

OnPartitionsLost sets the function to be called on "fatal" group errors, such as IllegalGeneration, UnknownMemberID, and authentication failures. This function differs from OnPartitionsRevoked in that it is unlikely that commits will succeed when partitions are outright lost, whereas commits likely will succeed when revoking partitions.

Because this function is called on any fatal group error, it is possible for this function to be called without the group ever being joined.

This function is not called concurrent with any other OnPartitions callback, and this function is given a new map that the user is free to modify.

This function can be called at any time you are polling or processing records. If you want to ensure this function is called serially with processing, consider the BlockRebalanceOnPoll option.

OnPartitionsRevoked sets the function to be called once this group member has partitions revoked.

This function, combined with OnPartitionsAssigned, should not exceed the rebalance interval. It is possible for the group to re-enter a new balancing session immediately after finishing a balance.

If autocommit is enabled, the default OnPartitionsRevoked is a blocking commit of all non-dirty offsets (where "dirty" is the most recent poll).

The OnPartitionsRevoked function is passed the client's context, which is only canceled if the client is closed. OnPartitionsRevoked function is called at the end of a group session even if there are no partitions being revoked. If you are committing offsets manually (have disabled autocommitting), it is highly recommended to do a proper blocking commit in OnPartitionsRevoked.

This function is not called concurrent with any other OnPartitions callback, and this function is given a new map that the user is free to modify.

This function can be called at any time you are polling or processing records. If you want to ensure this function is called serially with processing, consider the BlockRebalanceOnPoll option.

This function is called if a "fatal" group error is encountered and you have not set OnPartitionsLost. See OnPartitionsLost for more details.

RebalanceTimeout sets how long group members are allowed to take when a a rebalance has begun, overriding the default 60,000ms. This timeout is how long all members are allowed to complete work and commit offsets, minus the time it took to detect the rebalance (from a heartbeat).

Kafka uses the largest rebalance timeout of all members in the group. If a member does not rejoin within this timeout, Kafka will kick that member from the group.

This corresponds to Kafka's rebalance.timeout.ms.

func RequireStableFetchOffsets() GroupOpt

RequireStableFetchOffsets sets the group consumer to require "stable" fetch offsets before consuming from the group. Proposed in KIP-447 and introduced in Kafka 2.5, stable offsets are important when consuming from partitions that a transactional producer could be committing to.

With this option, Kafka will block group consumers from fetching offsets for partitions that are in an active transaction. This option is **strongly** recommended to help prevent duplication problems. See this repo's KIP-447 doc to learn more.

Because this can block consumption, it is strongly recommended to set transactional timeouts to a small value (10s) rather than the default 60s. Lowering the transactional timeout will reduce the chance that consumers are entirely blocked.

SessionTimeout sets how long a member in the group can go between heartbeats, overriding the default 45,000ms. If a member does not heartbeat in this timeout, the broker will remove the member from the group and initiate a rebalance.

If you are using a GroupTransactSession for EOS, wish to lower this, and are talking to a Kafka cluster pre 2.5, consider lowering the TransactionTimeout. If you do not, you risk a transaction finishing after a group has rebalanced, which could lead to duplicate processing. If you are talking to a Kafka 2.5+ cluster, you can safely use the RequireStableFetchOffsets group option and prevent any problems.

This option corresponds to Kafka's session.timeout.ms setting and must be within the broker's group.min.session.timeout.ms and group.max.session.timeout.ms.

type GroupTransactSession struct {
	
}

GroupTransactSession abstracts away the proper way to begin and end a transaction when consuming in a group, modifying records, and producing (EOS).

If you are running Kafka 2.5+, it is strongly recommended that you also use RequireStableFetchOffsets. See that config option's documentation for more details.

NewGroupTransactSession is exactly the same as NewClient, but wraps the client's OnPartitionsRevoked / OnPartitionsLost to ensure that transactions are correctly aborted whenever necessary so as to properly provide EOS.

When ETLing in a group in a transaction, if a rebalance happens before the transaction is ended, you either (a) must block the rebalance from finishing until you are done producing, and then commit before unblocking, or (b) allow the rebalance to happen, but abort any work you did.

The problem with (a) is that if your ETL work loop is slow, you run the risk of exceeding the rebalance timeout and being kicked from the group. You will try to commit, and depending on the Kafka version, the commit may even be erroneously successful (pre Kafka 2.5). This will lead to duplicates.

Instead, for safety, a GroupTransactSession favors (b). If a rebalance occurs at any time before ending a transaction with a commit, this will abort the transaction.

This leaves the risk that ending the transaction itself exceeds the rebalance timeout, but this is just one request with no cpu logic. With a proper rebalance timeout, this single request will not fail and the commit will succeed properly.

If this client detects you are talking to a pre-2.5 cluster, OR if you have not enabled RequireStableFetchOffsets, the client will sleep for 200ms after a successful commit to allow Kafka's txn markers to propagate. This is not foolproof in the event of some extremely unlikely communication patterns and **potentially** could allow duplicates. See this repo's transaction's doc for more details.

AllowRebalance is a wrapper around Client.AllowRebalance, with the exact same semantics. Refer to that function's documentation.

Begin begins a transaction, returning an error if the client has no transactional id or is already in a transaction. Begin must be called before producing records in a transaction.

Client returns the underlying client that this transact session wraps. This can be useful for functions that require a client, such as raw requests. The returned client should not be used to manage transactions (leave that to the GroupTransactSession).

Close is a wrapper around Client.Close, with the exact same semantics. Refer to that function's documentation.

This function must be called to leave the group before shutting down.

CloseAllowingRebalance is a wrapper around Client.CloseAllowingRebalance, with the exact same semantics. Refer to that function's documentation.

End ends a transaction, committing if commit is true, if the group did not rebalance since the transaction began, and if committing offsets is successful. If any of these conditions are false, this aborts. This flushes or aborts depending on `commit`.

This returns whether the transaction committed or any error that occurred. No returned error is retryable. Either the transactional ID has entered a failed state, or the client retried so much that the retry limit was hit, and odds are you should not continue. While a context is allowed, canceling it will likely leave the client in an invalid state. Canceling should only be done if you want to shut down.

PollFetches is a wrapper around Client.PollFetches, with the exact same semantics. Refer to that function's documentation.

It is invalid to call PollFetches concurrently with Begin or End.

PollRecords is a wrapper around Client.PollRecords, with the exact same semantics. Refer to that function's documentation.

It is invalid to call PollRecords concurrently with Begin or End.

Produce is a wrapper around Client.Produce, with the exact same semantics. Refer to that function's documentation.

It is invalid to call Produce concurrently with Begin or End.

ProduceSync is a wrapper around Client.ProduceSync, with the exact same semantics. Refer to that function's documentation.

It is invalid to call ProduceSync concurrently with Begin or End.

TryProduce is a wrapper around Client.TryProduce, with the exact same semantics. Refer to that function's documentation.

It is invalid to call TryProduce concurrently with Begin or End.

Hook is a hook to be called when something happens in kgo.

The base Hook interface is meaningless, but wherever a hook can occur in kgo, the client checks if your hook implements an appropriate interface. If so, your hook is called.

This allows you to only hook in to behavior you care about, and it allows the client to add more hooks in the future.

All hook interfaces in this package have Hook in the name. Hooks must be safe for concurrent use. It is expected that hooks are fast; if a hook needs to take time, then copy what you need and ensure the hook is async.

HookBrokerConnect is called after a connection to a broker is opened.

HookBrokerDisconnect is called when a connection to a broker is closed.

HookBrokerE2E is called after a write to a broker that errors, or after a read to a broker.

This differs from HookBrokerRead and HookBrokerWrite by tracking all E2E info for a write and a read, which allows for easier e2e metrics. This hook can replace both the read and write hook.

HookBrokerRead is called after a read from a broker.

Kerberos SASL does not cause read hooks, since it directly reads from the connection.

HookBrokerThrottle is called after a response to a request is read from a broker, and the response identifies throttling in effect.

HookBrokerWrite is called after a write to a broker.

Kerberos SASL does not cause write hooks, since it directly writes to the connection.

type HookClientClosed interface {
	
	
	OnClientClosed(*Client)
}

HookClientClosed is called in Close or CloseAfterRebalance after a client has been closed. This hook can be used to perform final cleanup work.

HookFetchBatchRead is called whenever a batch is read within the client.

Note that this hook is called when processing, but a batch may be internally discarded after processing in some uncommon specific circumstances.

If the client reads v0 or v1 message sets, and they are not compressed, then this hook will be called per record.

type HookFetchRecordBuffered interface {
	
	
	OnFetchRecordBuffered(*Record)
}

HookFetchRecordBuffered is called when a record is internally buffered after fetching, ready to be polled.

This hook can be used to write gauge metrics regarding the number of records or bytes buffered, or to write interceptors that modify a record before being returned from polling. If you just want a metric for the number of records buffered, use the client's BufferedFetchRecords method, as it is faster.

Note that this hook will slow down high-volume consuming a bit.

type HookFetchRecordUnbuffered interface {
	
	
	
	OnFetchRecordUnbuffered(r *Record, polled bool)
}

HookFetchRecordUnbuffered is called when a fetched record is unbuffered.

A record can be internally discarded after being in some scenarios without being polled, such as when the internal assignment changes.

As an example, if using HookFetchRecordBuffered for a gauge of how many record bytes are buffered ready to be polled, this hook can be used to decrement the gauge.

Note that this hook may slow down high-volume consuming a bit.

type HookGroupManageError interface {
	
	
	
	
	OnGroupManageError(error)
}

HookGroupManageError is called after every error that causes the client, operating as a group member, to break out of the group managing loop and backoff temporarily.

Specifically, any error that would result in OnPartitionsLost being called will result in this hook being called.

type HookNewClient interface {
	
	
	OnNewClient(*Client)
}

HookNewClient is called in NewClient after a client is initialized. This hook can be used to perform final setup work in your hooks.

HookProduceBatchWritten is called whenever a batch is known to be successfully produced.

type HookProduceRecordBuffered interface {
	
	
	
	
	OnProduceRecordBuffered(*Record)
}

HookProduceRecordBuffered is called when a record is buffered internally in the client from a call to Produce.

This hook can be used to write metrics that gather the number of records or bytes buffered, or the hook can be used to write interceptors that modify a record's key / value / headers before being produced. If you just want a metric for the number of records buffered, use the client's BufferedProduceRecords method, as it is faster.

Note that this hook may slow down high-volume producing a bit.

type HookProduceRecordPartitioned interface {
	
	
	
	
	
	
	OnProduceRecordPartitioned(*Record, int32)
}

HookProduceRecordPartitioned is called when a record is partitioned and internally ready to be flushed.

This hook can be used to create metrics of buffered records per partition, and then you can correlate that to partition leaders and determine which brokers are having problems.

Note that this hook will slow down high-volume producing and it is recommended to only use this temporarily or if you are ok with the performance hit.

type HookProduceRecordUnbuffered interface {
	
	
	
	OnProduceRecordUnbuffered(*Record, error)
}

HookProduceRecordUnbuffered is called just before a record's promise is finished; this is effectively a mirror of a record promise.

As an example, if using HookProduceRecordBuffered for a gauge of how many record bytes are buffered, this hook can be used to decrement the gauge.

Note that this hook will slow down high-volume producing a bit. As well, records that were buffered but are paused (and stripped internally before being returned to the user) will still be passed to this hook.

IntoSyncAssignment takes a balance plan and returns a list of assignments to use in a kmsg.SyncGroupRequest.

It is recommended to ensure the output is deterministic and ordered by member / topic / partitions.

type IsolationLevel struct {
	
}

IsolationLevel controls whether uncommitted or only committed records are returned from fetch requests.

ReadCommitted is an isolation level to only fetch committed records.

ReadUncommitted (the default) is an isolation level that returns the latest produced records, be they committed or not.

LogLevel designates which level the logger should log at.

const (
	
	LogLevelNone LogLevel = iota
	
	LogLevelError
	
	LogLevelWarn
	
	
	LogLevelInfo
	
	
	LogLevelDebug
)

Logger is used to log informational messages.

BasicLogger returns a logger that will print to dst in the following format:

prefix [LEVEL] message; key: val, key: val

prefixFn is optional; if non-nil, it is called for a per-message prefix.

Writes to dst are not checked for errors.

Metric is a user-defined client side metric so that you can send user-defined client metrics to the broker and give your cluster operator insight into your client.

This type exists to support KIP-1076, which is an extension to KIP-714. Read either of those for more detail.

MetricType is the type of metric you are providing: Type is the type of metric this is: either a gauge or a sum type.

Offset is a message offset in a partition.

NoResetOffset returns an offset that can be used as a "none" option for the ConsumeResetOffset option. By default, NoResetOffset starts consuming from the beginning of partitions (similar to NewOffset().AtStart()). This can be changed with AtEnd, Relative, etc.

Using this offset will make it such that if OffsetOutOfRange is ever encountered while consuming, rather than trying to recover, the client will return the error to the user and enter a fatal state (for the affected partition).

AfterMilli returns an offset that consumes from the first offset after a given timestamp. This option is *not* compatible with any At options (nor Relative nor WithEpoch); using any of those will clear the special millisecond state.

This option can be used to consume at the end of existing partitions, but at the start of any new partitions that are created later:

AfterMilli(time.Now().UnixMilli())

By default when using this offset, if consuming encounters an OffsetOutOfRange error, consuming will reset to the first offset after this timestamp. You can use NoResetOffset().AfterMilli(...) to instead switch the client to a fatal state (for the affected partition).

At returns a copy of the calling offset, changing the returned offset to begin at exactly the requested offset.

There are two potential special offsets to use: -2 allows for consuming at the start, and -1 allows for consuming at the end. These two offsets are equivalent to calling AtStart or AtEnd.

If the offset is less than -2, the client bounds it to -2 to consume at the start.

AtCommitted copies 'o' and returns an offset that is used *only if* there is an existing commit. This is only useful for group consumers. If a partition being consumed does not have a commit, the partition will enter a fatal state and return an error from PollFetches.

Using this function automatically opts into NoResetOffset.

AtEnd copies 'o' and returns an offset starting at the end of a partition. If you want to consume at the end of the topic as it exists right now, but at the beginning of new partitions as they are added to the topic later, check out AfterMilli.

AtStart copies 'o' and returns an offset starting at the beginning of a partition.

EpochOffset returns this offset as an EpochOffset, allowing visibility into what this offset actually currently is.

MarshalJSON implements json.Marshaler.

Relative copies 'o' and returns an offset that starts 'n' relative to what 'o' currently is. If 'o' is at the end (from [AtEnd]), Relative(-100) will begin 100 before the end.

String returns the offset as a string; the purpose of this is for logs.

WithEpoch copies 'o' and returns an offset with the given epoch. This epoch is used for truncation detection; the default of -1 implies no truncation detection.

Opt is an option to configure a client.

func AllowAutoTopicCreation() Opt

AllowAutoTopicCreation enables topics to be auto created if they do not exist when fetching their metadata.

BrokerMaxReadBytes sets the maximum response size that can be read from Kafka, overriding the default 100MiB.

This is a safety measure to avoid OOMing on invalid responses. This is slightly double FetchMaxBytes; if bumping that, consider bump this. No other response should run the risk of hitting this limit.

BrokerMaxWriteBytes upper bounds the number of bytes written to a broker connection in a single write, overriding the default 100MiB.

This number corresponds to the a broker's socket.request.max.bytes, which defaults to 100MiB.

The only Kafka request that could come reasonable close to hitting this limit should be produce requests, and thus this limit is only enforced for produce requests.

ClientID uses id for all requests sent to Kafka brokers, overriding the default "kgo".

ConcurrentTransactionsBackoff sets the backoff interval to use during transactional requests in case we encounter CONCURRENT_TRANSACTIONS error, overriding the default 20ms.

Sometimes, when a client begins a transaction quickly enough after finishing a previous one, Kafka will return a CONCURRENT_TRANSACTIONS error. Clients are expected to backoff slightly and retry the operation. Lower backoffs may increase load on the brokers, while higher backoffs may increase transaction latency in clients.

Note that if brokers are hanging in this concurrent transactions state for too long, the client progressively increases the backoff.

ConnIdleTimeout is a rough amount of time to allow connections to idle before they are closed, overriding the default 20.

In the worst case, a connection can be allowed to idle for up to 2x this time, while the average is expected to be 1.5x (essentially, a uniform distribution from this interval to 2x the interval).

It is possible that a connection can be reaped just as it is about to be written to, but the client internally retries in these cases.

Connections are not reaped if they are actively being written to or read from; thus, a request can take a really long time itself and not be reaped (however, this may lead to the RequestTimeoutOverhead).

ConsiderMissingTopicDeletedAfter sets the amount of time a topic can be missing from metadata responses _after_ loading it at least once before it is considered deleted, overriding the default of 15s. Note that for newer versions of Kafka, it may take a bit of time (~15s) for the cluster to fully recognize a newly created topic. If this option is set too low, there is some risk that the client will internally purge and re-see a topic a few times until the cluster fully broadcasts the topic creation.

DialTLS opts into dialing brokers with TLS. This is a shortcut for DialTLSConfig with an empty config. See DialTLSConfig for more details.

DialTLSConfig opts into dialing brokers with the given TLS config with a 10s dial timeout. This is a shortcut for manually specifying a tls dialer using the Dialer option. You can also change the default 10s timeout with DialTimeout.

Every dial, the input config is cloned. If the config's ServerName is not specified, this function uses net.SplitHostPort to extract the host from the broker being dialed and sets the ServerName. In short, it is not necessary to set the ServerName.

DialTimeout sets the dial timeout, overriding the default of 10s. This option is useful if you do not want to set a custom dialer, and is useful in tandem with DialTLSConfig.

Dialer uses fn to dial addresses, overriding the default dialer that uses a 10s dial timeout and no TLS.

The context passed to the dial function is the context used in the request that caused the dial. If the request is a client-internal request, the context is the context on the client itself (which is canceled when the client is closed).

This function has the same signature as net.Dialer's DialContext and tls.Dialer's DialContext, meaning you can use this function like so:

kgo.Dialer((&net.Dialer{Timeout: 10*time.Second}).DialContext)

or

kgo.Dialer((&tls.Dialer{...}).DialContext)
func DisableClientMetrics() Opt

DisableClientMetrics opts out of collecting and sending client metrics to the broker (if the broker supports receiving client metrics). By default, clients are recommended to gather a small set of metrics to help cluster operators debug client issues (rather than relying on clients which may not be instrumented at all).

For more details on client metrics, see KIP-714.

MaxVersions sets the maximum Kafka version to try, overriding the internal unbounded (latest stable) versions.

Note that specific max version pinning is required if trying to interact with versions pre 0.10.0. Otherwise, unless using more complicated requests that this client itself does not natively use, it is generally safe to opt for the latest version. If using the kmsg package directly to issue requests, it is recommended to pin versions so that new fields on requests do not get invalid default zero values before you update your usage.

MetadataMaxAge sets the maximum age for the client's cached metadata, overriding the default 5m, to allow detection of new topics, partitions, etc.

This corresponds to Kafka's metadata.max.age.ms.

MetadataMinAge sets the minimum time between metadata queries, overriding the default 5s. You may want to raise or lower this to reduce the number of metadata queries the client will make. Notably, if metadata detects an error in any topic or partition, it triggers itself to update as soon as allowed.

MinVersions sets the minimum Kafka version a request can be downgraded to, overriding the default of the lowest version.

This option is useful if you are issuing requests that you absolutely do not want to be downgraded; that is, if you are relying on features in newer requests, and you are not sure if your brokers can handle those features. By setting a min version, if the client detects it needs to downgrade past the version, it will instead avoid issuing the request.

Unlike MaxVersions, if a request is issued that is unknown to the min versions, the request is allowed. It is assumed that there is no lower bound for that request.

OnRebootstrapRequired sets the function to call when a metadata response has the REBOOTSTRAP_REQUIRED errored. The function should return new seed brokers for the client to use, or an error. Internally, the client will then call UpdateSeedBrokers with the seeds you return. All other live connections to brokers are stopped and active requests are failed.

The REBOOTSTRAP_REQUIRED error was introduced in Kafka 4.0, as a way for Kafka to tell the client that the client needs to stop all non seed broker connections to stop and for the client to query the seed brokers again. Franz-go by default already periodically sends a request to a seed broker to prevent a scenario where all previously discovered brokers are down or unavailable, so this client does not have as much of a need for REBOOTSTRAP_REQUIRED. That said, this function can be useful if Kafka knows the client should specifically talk to seed brokers next, and this function allows you a chance to update your seed brokers at the same time. If you do not want to update your seed brokers, you can just return the same value that you use in your SeedBrokers configuration option.

You can read KIP-1102 for more info about this option.

func RequestRetries(n int) Opt

RequestRetries sets the number of tries that retryable requests are allowed, overriding the default of 20s.

This option does not apply to produce requests; to limit produce request retries / record retries, see RecordRetries.

RequestTimeoutOverhead uses the given time as overhead while deadlining requests, overriding the default overhead of 10s.

For most requests, the timeout is set to the overhead. However, for any request with a TimeoutMillis field, the overhead is added on top of the request's TimeoutMillis. This ensures that we give Kafka enough time to actually process the request given the timeout, while still having a deadline on the connection as a whole to ensure it does not hang.

For writes, the timeout is always the overhead. We buffer writes in our client before one quick flush, so we always expect the write to be fast.

Note that hitting the timeout kills a connection, which will fail any other active writes or reads on the connection.

This option is roughly equivalent to request.timeout.ms, but grants additional time to requests that have timeout fields.

RetryBackoffFn sets the backoff strategy for how long to backoff for a given amount of retries, overriding the default jittery exponential backoff that ranges from 250ms min to 2.5s max.

This (roughly) corresponds to Kafka's retry.backoff.ms setting and retry.backoff.max.ms (which is being introduced with KIP-500).

RetryTimeout sets the upper limit on how long we allow a request to be issued and then reissued on failure. That is, this control the total end-to-end maximum time we allow for trying a request, This overrides the default of:

JoinGroup: cfg.SessionTimeout (default 45s)
SyncGroup: cfg.SessionTimeout (default 45s)
Heartbeat: cfg.SessionTimeout (default 45s)
   others: 30s

This timeout applies to any request issued through a client's Request function. It does not apply to fetches nor produces.

A value of zero indicates no request timeout.

The timeout is evaluated after a request errors. If the time since the start of the first request plus any backoff for the latest failure is less than the retry timeout, the request will be issued again.

RetryTimeoutFn sets the upper limit on how long we allow a request to be issued and then reissued on failure. That is, this control the total end-to-end maximum time we allow for trying a request, This overrides the default of:

JoinGroup: cfg.SessionTimeout (default 45s)
SyncGroup: cfg.SessionTimeout (default 45s)
Heartbeat: cfg.SessionTimeout (default 45s)
   others: 30s

This timeout applies to any request issued through a client's Request function. It does not apply to fetches nor produces.

The function is called with the request key that is being retried. While it is not expected that the request key will be used, including it gives users the opportinuty to have different retry timeouts for different keys.

If the function returns zero, there is no retry timeout.

The timeout is evaluated after a request errors. If the time since the start of the first request plus any backoff for the latest failure is less than the retry timeout, the request will be issued again.

SASL appends sasl authentication options to use for all connections.

SASL is tried in order; if the broker supports the first mechanism, all connections will use that mechanism. If the first mechanism fails, the client will pick the first supported mechanism. If the broker does not support any client mechanisms, connections will fail.

SeedBrokers sets the seed brokers for the client to use, overriding the default 127.0.0.1:9092.

Any seeds that are missing a port use the default Kafka port 9092.

func SoftwareNameAndVersion
func SoftwareNameAndVersion(name, version string) Opt

SoftwareNameAndVersion sets the client software name and version that will be sent to Kafka as part of the ApiVersions request as of Kafka 2.4, overriding the default "kgo" and internal version number.

Kafka exposes this through metrics to help operators understand the impact of clients.

It is generally not recommended to set this. As well, if you do, the name and version must match the following regular expression:

[a-zA-Z0-9](?:[a-zA-Z0-9\.-]*[a-zA-Z0-9])?

Note this means neither the name nor version can be empty.

UserMetricsFn sets the function to call to add user metrics when rolling up client metrics to send to the broker. Every metric rollup, fn is called and returns an iterator. All metrics returned from the iterator are included in the client metric aggregation and are sent to the broker. It is your responsibility to ensure the metric name is formatted correctly (namespaced and following OpenTelemetry format), and you need to ensure your Sum metrics are monotonically increasing. See the documentation on Metric for more details.

For more details about the client sending metrics, see KIP-714. For more details about enhancing client metrics with user metrics, see KIP-1076.

WithContext sets the client to use a custom context.

By default, the client uses context.Background.

WithHooks sets hooks to call whenever relevant.

Hooks can be used to layer in metrics (such as Prometheus hooks) or anything else. The client will call all hooks in order. See the Hooks interface for more information, as well as any interface that contains "Hook" in the name to know the available hooks. A single hook can implement any or all hook interfaces, and only the hooks that it implements will be called.

WithLogger sets the client to use the given logger, overriding the default to not use a logger.

It is invalid to use a nil logger; doing so will cause panics.

WithPools sets memory pools to use wherever relevant.

Pools can be used to optimize memory usage for data that is frequently thrown away after a short usage. For a list of all supported pools, look at the documentation for any interface that begins with "Pool". Multiple pools may be used; the first pool that is received from is the first pull put back into. A single pool can implement any or all pool interfaces.

If you use pools for fetching, the record Context field will be populated. This field is used for recycling the underlying memory once Recycle is called; do not clear the field.

Partitioner creates topic partitioners to determine which partition messages should be sent to.

Note that a record struct is unmodified (minus a potential default topic) from producing through partitioning, so you can set fields in the record struct before producing to aid in partitioning with a custom partitioner.

BasicConsistentPartitioner wraps a single function to provide a Partitioner and TopicPartitioner (that function is essentially a combination of Partitioner.ForTopic and TopicPartitioner.Partition).

As a minimal example, if you do not care about the topic and you set the partition before producing:

kgo.BasicConsistentPartitioner(func(topic) func(*Record, int) int {
        return func(r *Record, n int) int {
                return int(r.Partition)
        }
})

LeastBackupPartitioner prioritizes partitioning by three factors, in order:

  1. pin to the current pick until there is a new batch
  2. on new batch, choose the least backed up partition (the partition with the fewest amount of buffered records)
  3. if multiple partitions are equally least-backed-up, choose one at random

This algorithm prioritizes least-backed-up throughput, which may result in unequal partitioning. It is likely that this algorithm will talk most to the broker that it has the best connection to.

This algorithm is resilient to brokers going down: if a few brokers die, it is possible your throughput will be so high that the maximum buffered records will be reached in the now-offline partitions before metadata responds that the broker is offline. With the standard partitioning algorithms, the only recovery is if the partition is remapped or if the broker comes back online. With the least backup partitioner, downed partitions will see slight backup, but then the other partitions that are still accepting writes will get all of the writes and your client will not be blocked.

Under ideal scenarios (no broker / connection issues), StickyPartitioner is equivalent to LeastBackupPartitioner. This partitioner is only recommended if you are a producer consistently dealing with flaky connections or problematic brokers and do not mind uneven load on your brokers.

ManualPartitioner is a partitioner that simply returns the Partition field that is already set on any record.

Any record with an invalid partition will be immediately failed. This partitioner is simply the partitioner that is demonstrated in the BasicConsistentPartitioner documentation.

RoundRobinPartitioner is a partitioner that round-robin's through all available partitions. This algorithm has lower throughput and causes higher CPU load on brokers, but can be useful if you want to ensure an even distribution of records to partitions.

StickyKeyPartitioner mirrors the default Java partitioner from Kafka's 2.4 release (see KIP-480 and KAFKA-8601) until their 3.3 release. This was replaced in 3.3 with the uniform sticky partitioner (KIP-794), which is reimplemented in this client as the UniformBytesPartitioner.

This is the same "hash the key consistently, if no key, choose random partition" strategy that the Java partitioner has always used, but rather than always choosing a random partition, the partitioner pins a partition to produce to until that partition rolls over to a new batch. Only when rolling to new batches does this partitioner switch partitions.

The benefit with this pinning is less CPU utilization on Kafka brokers. Over time, the random distribution is the same, but the brokers are handling on average larger batches.

hasher is optional; if nil, this will return a partitioner that partitions exactly how Kafka does. Specifically, the partitioner will use murmur2 to hash keys, will mask out the 32nd bit, and then will mod by the number of potential partitions.

StickyPartitioner is the same as StickyKeyPartitioner, but with no logic to consistently hash keys. That is, this only partitions according to the sticky partition strategy.

UniformBytesPartitioner is a redux of the StickyPartitioner, proposed in KIP-794 and release with the Java client in Kafka 3.3. This partitioner returns the same partition until 'bytes' is hit. At that point, a re-partitioning happens. If adaptive is false, this chooses a new random partition, otherwise this chooses a broker based on the inverse of the backlog currently buffered for that broker. If keys is true, this uses standard hashing based on record key for records with non-nil keys. hasher is optional; if nil, the default hasher murmur2 (Kafka's default).

The point of this hasher is to create larger batches while producing the same amount to all partitions over the long run. Adaptive opts in to a slight imbalance so that this can produce more to brokers that are less loaded.

This implementation differs slightly from Kafka's because this does not account for the compressed size of a batch, nor batch overhead. For overhead, in practice, the overhead is relatively constant so it would affect all batches equally. For compression, this client does not compress until after a batch is created and frozen, so it is not possible to track compression. This client also uses the number of records for backup calculation rather than number of bytes, but the heuristic should be similar. Lastly, this client does not have a timeout for partition availability. Realistically, these will be the most backed up partitions so they should be chosen the least.

NOTE: This implementation may create sub-optimal batches if lingering is enabled. This client's default is to disable lingering. The patch used to address this in Kafka is KAFKA-14156 (which itself is not perfect in the context of disabling lingering). For more details, read KAFKA-14156.

PartitionerHasher returns a partition to use given the input data and number of partitions.

KafkaHasher returns a PartitionerHasher using hashFn that mirrors how Kafka partitions after hashing data. In Kafka, after hashing into a uint32, the hash is converted to an int32 and the high bit is stripped. Kafka by default uses murmur2 hashing, and the StickyKeyPartiitoner uses this by default. Using this KafkaHasher function is only necessary if you want to change the underlying hashing algorithm.

SaramaCompatHasher returns a PartitionerHasher using hashFn that mirrors how Sarama partitions after hashing data.

Sarama has two differences from Kafka when partitioning:

1) In Kafka, when converting the uint32 hash to an int32, Kafka masks the high bit. In Sarama, if the high bit is 1 (i.e., the number as an int32 is negative), Sarama negates the number.

2) Kafka by default uses the murmur2 hashing algorithm. Sarama by default uses fnv-1a.

Sarama added a NewReferenceHashPartitioner function that attempted to align with Kafka, but the reference partitioner only fixed the first difference, not the second. Further customization options were added later that made it possible to exactly match Kafka when hashing.

In short, to *exactly* match the Sarama defaults, use the following:

kgo.StickyKeyPartitioner(kgo.SaramaCompatHasher(fnv32a))

Where fnv32a is a function returning a new 32 bit fnv-1a hasher.

func fnv32a(b []byte) uint32 {
	h := fnv.New32a()
	h.Reset()
	h.Write(b)
	return h.Sum32()
}

SaramaHasher is a historical misnamed partitioner. This library's original implementation of the SaramaHasher was incorrect, if you want an exact match for the Sarama partitioner, use the SaramaCompatHasher.

This partitioner remains because as it turns out, other ecosystems provide a similar partitioner and this partitioner is useful for compatibility.

In particular, using this function with a crc32.ChecksumIEEE hasher makes this partitioner match librdkafka's consistent partitioner, or the zendesk/ruby-kafka partitioner.

Pool is a memory pool to be used where relevant.

The base Pool interface is meaningless, but wherever a type can be pooled in kgo, the client checks if your pool implements an appropriate pool interface. If so, the pool is received from (Get), and when the data is done being used, the pool is put back into (Put).

All pool interfaces in this package have Pool in the name. Pools must be safe for concurrent use.

PoolDecompressBytes is a pool that returns a slice that decompressed data will be decoded into.

PoolKRecords is a pool that returns a slice that raw kmsg.Record's are decoded into.

type PoolRecords interface {
	
	GetRecords(n int) []Record
	
	PutRecords([]Record)
}

PoolRecords is a pool that returns a slice of Record's.

PreferLagFn accepts topic and partition lag, the previously determined topic order, and the previously determined per-topic partition order, and returns a new topic and per-topic partition order.

Most use cases will not need to look at the prior orders, but they exist if you want to get fancy.

You can return partial results: if you only return topics, partitions within each topic keep their prior ordering. If you only return some topics but not all, the topics you do not return / the partitions you do not return will retain their original ordering *after* your given ordering.

NOTE: torderPrior and porderPrior must not be modified. To avoid a bit of unnecessary allocations, these arguments are views into data that is used to build a fetch request.

PreferLagAt is a simple PreferLagFn that orders the largest lag first, for any topic that is collectively lagging more than preferLagAt, and for any partition that is lagging more than preferLagAt.

The function does not prescribe any ordering for topics that have the same lag. It is recommended to use a number more than 0 or 1: if you use 0, you may just always undo client ordering when there is no actual lag.

ProcessFetchPartitionOpts contains required inputs for processing a fetch partition and options for how records & offsets should be processed.

type ProduceBatchMetrics struct {
	
	
	NumRecords int

	
	
	
	
	
	
	
	UncompressedBytes int

	
	
	
	
	
	
	
	
	CompressedBytes int

	
	
	
	
	
	CompressionType uint8
}

ProduceBatchMetrics tracks information about successful produces to partitions.

ProduceResult is the result of producing a record in a synchronous manner.

ProduceResults is a collection of produce results.

First the first record and error in the produce results.

This function is useful if you only passed one record to ProduceSync.

FirstErr returns the first erroring result, if any.

type ProducerOpt interface {
	Opt
	
}

ProducerOpt is a producer specific option to configure a client. This is simply a namespaced Opt.

DefaultProduceTopic sets the default topic to produce to if the topic field is empty in a Record.

If this option is not used, if a record has an empty topic, the record cannot be produced and will be failed immediately.

DefaultProduceTopicAlways sets the client to ALWAYS produce to the DefaultProduceTopic, overriding any Topic field that may be present in the Record when producing.

DisableIdempotentWrite disables idempotent produce requests, opting out of Kafka server-side deduplication in the face of reissued requests due to transient network problems. Disabling idempotent write by default upper-bounds the number of in-flight produce requests per broker to 1, vs. the default of 5 when using idempotency.

Idempotent production is strictly a win, but does require the IDEMPOTENT_WRITE permission on CLUSTER (pre Kafka 3.0), and not all clients can have that permission.

This option is incompatible with specifying a transactional id.

ManualFlushing disables auto-flushing when producing. While you can still set lingering, it would be useless to do so.

With manual flushing, producing while MaxBufferedRecords or MaxBufferedBytes have already been produced and not flushed will return ErrMaxBuffered.

MaxBufferedBytes sets the max amount of bytes that the client will buffer while producing, blocking produces until records are finished if this limit is reached. This overrides the unlimited default.

Note that this option does _not_ apply for consuming: the client cannot limit bytes buffered for consuming because of decompression. You can roughly control consuming memory by using MaxConcurrentFetches, FetchMaxBytes, and FetchMaxPartitionBytes.

If you produce a record that is larger than n, the record is immediately failed with kerr.MessageTooLarge.

Note that this limit applies after MaxBufferedRecords.

MaxBufferedRecords sets the max amount of records the client will buffer, blocking produces until records are finished if this limit is reached. This overrides the default of 10,000.

MaxProduceRequestsInflightPerBroker changes the number of allowed produce requests in flight per broker if you disable idempotency, overriding the default of 1. If using idempotency, this option has no effect: the maximum in flight for Kafka v0.11 is 1, and from v1 onward is 5.

Using more than 1 may result in out of order records and may result in duplicates if there are connection issues.

ProduceRequestTimeout sets how long Kafka broker's are allowed to respond to produce requests, overriding the default 10s. If a broker exceeds this duration, it will reply with a request timeout error.

This somewhat corresponds to Kafka's request.timeout.ms setting, but only applies to produce requests. This settings sets the TimeoutMillis field in the produce request itself. The RequestTimeoutOverhead is applied as a write limit and read limit in addition to this.

ProducerBatchCompression sets the compression codec to use for producing records.

Compression is chosen in the order preferred based on broker support. For example, zstd compression was introduced in Kafka 2.1, so the preference can be first zstd, fallback snappy, fallback none.

The default preference is [snappy, none], which should be fine for all old consumers since snappy compression has existed since Kafka 0.8.0. To use zstd, your brokers must be at least 2.1 and all consumers must be upgraded to support decoding zstd records.

Alternatively, if you want finer control over compression you can use WithCompressor for complete control.

ProducerBatchMaxBytes upper bounds the size of a record batch, overriding the default 1,000,012 bytes. This mirrors Kafka's max.message.bytes.

Record batches are independent of a ProduceRequest: a record batch is specific to a topic and partition, whereas the produce request can contain many record batches for many topics.

If a single record encodes larger than this number (before compression), it will not be written and a callback will have the appropriate error.

Note that this is the maximum size of a record batch before compression. If a batch compresses poorly and actually grows the batch, the uncompressed form will be used.

ProducerLinger sets how long individual topic partitions will linger waiting for more records before triggering a request to be built.

Note that this option should only be used in low volume producers. The only benefit of lingering is to potentially build a larger batch to reduce cpu usage on the brokers if you have many producers all producing small amounts.

If a produce request is triggered by any topic partition, all partitions with a possible batch to be sent are used and all lingers are reset.

As mentioned, the linger is specific to topic partition. A high volume producer will likely be producing to many partitions; it is both unnecessary to linger in this case and inefficient because the client will have many timers running (and stopping and restarting) unnecessarily.

ProducerOnDataLossDetected sets a function to call if data loss is detected when producing records if the client is configured to continue on data loss. Thus, this option is mutually exclusive with StopProducerOnDataLossDetected.

The passed function will be called with the topic and partition that data loss was detected on.

RecordDeliveryTimeout sets a rough time of how long a record can sit around in a batch before timing out, overriding the unlimited default.

If idempotency is enabled (as it is by default), this option is only enforced if it is safe to do so without creating invalid sequence numbers. It is safe to enforce if a record was never issued in a request to Kafka, or if it was requested and received a response.

The timeout for all records in a batch inherit the timeout of the first record in that batch. That is, once the first record's timeout expires, all records in the batch are expired. This generally is a non-issue unless using this option with lingering. In that case, simply add the linger to the record timeout to avoid problems.

If a record times out, all records buffered in the same partition are failed as well. This ensures gapless ordering: the client will not fail one record only to produce a later one successfully. This also allows for easier sequence number ordering internally.

The timeout is only evaluated before writing a request or after a produce response. Thus, a sink backoff may delay record timeout slightly.

This option is roughly equivalent to delivery.timeout.ms.

RecordPartitioner uses the given partitioner to partition records, overriding the default UniformBytesPartitioner(64KiB, true, true, nil).

RecordRetries sets the number of tries for producing records, overriding the unlimited default.

If idempotency is enabled (as it is by default), this option is only enforced if it is safe to do so without creating invalid sequence numbers. It is safe to enforce if a record was never issued in a request to Kafka, or if it was requested and received a response.

If a record fails due to retries, all records buffered in the same partition are failed as well. This ensures gapless ordering: the client will not fail one record only to produce a later one successfully. This also allows for easier sequence number ordering internally.

If a topic repeatedly fails to load with UNKNOWN_TOPIC_OR_PARTITION, it has a different limit (the UnknownTopicRetries option). All records for a topic that repeatedly cannot be loaded are failed when that limit is hit.

This option is different from RequestRetries to allow finer grained control of when to fail when producing records.

RequiredAcks sets the required acks for produced records, overriding the default RequireAllISRAcks.

StopProducerOnDataLossDetected sets the client to stop producing if data loss is detected, overriding the default false.

Note that if using this option, it is strongly recommended to not have a retry limit. Doing so may lead to errors where the client fails a batch on a recoverable error, which internally bumps the idempotent sequence number used for producing, which may then later cause an inadvertent out of order sequence number and false "data loss" detection.

TransactionTimeout sets the allowed for a transaction, overriding the default 40s. It is a good idea to keep this less than a group's session timeout, so that a group member will always be alive for the duration of a transaction even if connectivity dies. This helps prevent a transaction finishing after a rebalance, which is problematic pre-Kafka 2.5. If you are on Kafka 2.5+, then you can use the RequireStableFetchOffsets option when assigning the group, and you can set this to whatever you would like.

Transaction timeouts begin when the first record is produced within a transaction, not when a transaction begins.

TransactionalID sets a transactional ID for the client, ensuring that records are produced transactionally under this ID (exactly once semantics).

For Kafka-to-Kafka transactions, the transactional ID is only one half of the equation. You must also assign a group to consume from.

To produce transactionally, you first [BeginTransaction], then produce records consumed from a group, then you [EndTransaction]. All records produced outside of a transaction will fail immediately with an error.

After producing a batch, you must commit what you consumed. Auto committing offsets is disabled during transactional consuming / producing.

Note that unless using Kafka 2.5, a consumer group rebalance may be problematic. Production should finish and be committed before the client rejoins the group. It may be safer to use an eager group balancer and just abort the transaction. Alternatively, any time a partition is revoked, you could abort the transaction and reset offsets being consumed.

If the client detects an unrecoverable error, all records produced thereafter will fail.

Lastly, the default read level is READ_UNCOMMITTED. Be sure to use the ReadIsolationLevel option if you want to only read committed.

UnknownTopicRetries sets the number of times a record can fail with UNKNOWN_TOPIC_OR_PARTITION, overriding the default 4.

This is a separate limit from RecordRetries because unknown topic or partition errors should only happen if the topic does not exist. It is pointless for the client to continue producing to a topic that does not exist, and if we repeatedly see that the topic does not exist across multiple metadata queries (which are going to different brokers), then we may as well stop trying and fail the records.

If this is -1, the client never fails records with this error.

WithCompressor allows you to completely control how produce batches are compressed, allowing you to use alternative libraries than what franz-go supports, allowing you to have more control over memory & pooling, and other benefits. It is recommended to just use ProducerBatchCompression for simplicity (or specify nothing, which opts into snappy by default). The client default compressor is the DefaultCompressor.

Record is a record to write to Kafka.

KeySliceRecord returns a Record with the Key and Value fields set to the input key and value slices. For producing, this function is useful in tandem with the client-level DefaultProduceTopic option.

KeyStringRecord returns a Record with the Key and Value fields set to the input key and value strings. For producing, this function is useful in tandem with the client-level DefaultProduceTopic option.

This function uses the 'unsafe' package to avoid copying value into a slice.

NOTE: It is NOT SAFE to modify the record's value. This function should only be used if you only ever read record fields. This function can safely be used for producing; the client never modifies a record's key nor value fields.

SliceRecord returns a Record with the Value field set to the input value slice. For producing, this function is useful in tandem with the client-level DefaultProduceTopic option.

StringRecord returns a Record with the Value field set to the input value string. For producing, this function is useful in tandem with the client-level DefaultProduceTopic option.

This function uses the 'unsafe' package to avoid copying value into a slice.

NOTE: It is NOT SAFE to modify the record's value. This function should only be used if you only ever read record fields. This function can safely be used for producing; the client never modifies a record's key nor value fields.

AppendFormat appends a record to b given the layout or returns an error if the layout is invalid. This is a one-off shortcut for using NewRecordFormatter. See that function's documentation for the layout specification.

Recycle "recycles" this record if it was taken from a pool, and frees its attachment to any underlying pooled slices. If the pooled slice no longer has any records attached, the slices are put back into their pools.

This method is only relevant if you are using the WithPools option.

NOTE: It is invalid to continue using the record after calling recycle; doing so may result in corruption and data races. If you use PoolDecompressBytes, you cannot continue to use a shallow copy of any fields, you must clone them!

type RecordAttrs struct {
	
}

RecordAttrs contains additional meta information about a record, such as its compression or timestamp type.

CompressionType signifies with which algorithm this record was compressed.

0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is zstd. The returned uint8 can be converted directly to a CompressionCodecType.

IsControl returns whether a record is a "control" record (ABORT or COMMIT). These are generally not visible unless explicitly opted into.

IsTransactional returns whether a record is a part of a transaction.

TimestampType specifies how Timestamp was determined.

The default, 0, means that the timestamp was determined in a client when the record was produced.

An alternative is 1, which is when the Timestamp is set in Kafka.

Records pre 0.10.0 did not have timestamps and have value -1.

type RecordFormatter struct {
	
}

RecordFormatter formats records.

NewRecordFormatter returns a formatter for the given layout, or an error if the layout is invalid.

The formatter is very powerful, as such there is a lot to describe. This documentation attempts to be as succinct as possible.

Similar to the fmt package, record formatting is based off of slash escapes and percent "verbs" (copying fmt package lingo). Slashes are used for common escapes,

\t \n \r \\ \xNN

printing tabs, newlines, carriage returns, slashes, and hex encoded characters.

Percent encoding opts in to printing aspects of either a record or a fetch partition:

%t    topic
%T    topic length
%k    key
%K    key length
%v    value
%V    value length
%h    begin the header specification
%H    number of headers
%p    partition
%o    offset
%e    leader epoch
%d    timestamp (date, formatting described below)
%a    record attributes (formatting required, described below)
%x    producer id
%y    producer epoch

For AppendPartitionRecord, the formatter also undersands the following three formatting options:

%[    partition log start offset
%|    partition last stable offset
%]    partition high watermark

The formatter internally tracks the number of times AppendRecord or AppendPartitionRecord have been called. The special option %i prints the iteration / call count:

%i    format iteration number (starts at 1)

Lastly, there are three escapes to print raw characters that are usually used for formatting options:

%%    percent sign
%{    left brace (required if a brace is after another format option)
%}    right brace

Specifying headers is essentially a primitive nested format option, accepting the key and value escapes above:

%K    header key length
%k    header key
%V    header value length
%v    header value

For example, "%H %h{%k %v }" will print the number of headers, and then each header key and value with a space after each.

Verb modifiers

Most of the previous verb specifications can be modified by adding braces with a given modifier, e.g., "%V{ascii}". All modifiers are described below.

Numbers

All number verbs accept braces that control how the number is printed:

%v{ascii}       the default, print the number as ascii
%v{number}      alias for ascii

%v{hex64}       print 16 hex characters for the number
%v{hex32}       print 8 hex characters for the number
%v{hex16}       print 4 hex characters for the number
%v{hex8}        print 2 hex characters for the number
%v{hex4}        print 1 hex characters for the number
%v{hex}         print as many hex characters as necessary for the number

%v{big64}       print the number in big endian uint64 format
%v{big32}       print the number in big endian uint32 format
%v{big16}       print the number in big endian uint16 format
%v{big8}        alias for byte

%v{little64}    print the number in little endian uint64 format
%v{little32}    print the number in little endian uint32 format
%v{little16}    print the number in little endian uint16 format
%v{little8}     alias for byte

%v{byte}        print the number as a single byte
%v{bool}        print "true" if the number is non-zero, otherwise "false"

All numbers are truncated as necessary per each given format.

Timestamps

Timestamps can be specified in three formats: plain number formatting, native Go timestamp formatting, or strftime formatting. Number formatting is follows the rules above using the millisecond timestamp value. Go and strftime have further internal format options:

%d{go##2006-01-02T15:04:05Z07:00##}
%d{strftime[%F]}

An arbitrary amount of pounds, braces, and brackets are understood before beginning the actual timestamp formatting. For Go formatting, the format is simply passed to the time package's AppendFormat function. For strftime, all "man strftime" options are supported. Time is always in UTC.

Attributes

Records attributes require formatting, where each formatting option selects which attribute to print and how to print it.

%a{compression}
%a{compression;number}
%a{compression;big64}
%a{compression;hex8}

By default, prints the compression as text ("none", "gzip", ...). Compression can be printed as a number with ";number", where number is any number formatting option described above.

%a{timestamp-type}
%a{timestamp-type;big64}

Prints -1 for pre-0.10 records, 0 for client generated timestamps, and 1 for broker generated. Number formatting can be controlled with ";number".

%a{transactional-bit}
%a{transactional-bit;bool}

Prints 1 if the record is a part of a transaction or 0 if it is not. Number formatting can be controlled with ";number".

%a{control-bit}
%a{control-bit;bool}

Prints 1 if the record is a commit marker or 0 if it is not. Number formatting can be controlled with ";number".

Text

Topics, keys, and values have "base64", "base64raw", "hex", and "unpack" formatting options:

%t{hex}
%k{unpack{<bBhH>iIqQc.$}}
%v{base64}
%v{base64raw}

Unpack formatting is inside of enclosing pounds, braces, or brackets, the same way that timestamp formatting is understood. The syntax roughly follows Python's struct packing/unpacking rules:

x    pad character (does not parse input)
<    parse what follows as little endian
>    parse what follows as big endian

b    signed byte
B    unsigned byte
h    int16  ("half word")
H    uint16 ("half word")
i    int32
I    uint32
q    int64  ("quad word")
Q    uint64 ("quad word")

c    any character
.    alias for c
s    consume the rest of the input as a string
$    match the end of the line (append error string if anything remains)

Unlike python, a '<' or '>' can appear anywhere in the format string and affects everything that follows. It is possible to switch endianness multiple times. If the parser needs more data than available, or if the more input remains after '$', an error message will be appended.

AppendPartitionRecord appends a record and partition to b given the parsed format and returns the updated slice.

AppendRecord appends a record to b given the parsed format and returns the updated slice.

type RecordHeader struct {
}

RecordHeader contains extra information that can be sent with Records.

type RecordReader struct {
	
}

RecordReader reads records from an io.Reader.

NewRecordReader returns a record reader for the given layout, or an error if the layout is invalid.

Similar to the RecordFormatter, the RecordReader parsing is quite powerful. There is a bit less to describe in comparison to RecordFormatter, but still, this documentation attempts to be as succinct as possible.

Similar to the fmt package, record parsing is based off of slash escapes and percent "verbs" (copying fmt package lingo). Slashes are used for common escapes,

\t \n \r \\ \xNN

reading tabs, newlines, carriage returns, slashes, and hex encoded characters.

Percent encoding reads into specific values of a Record:

%t    topic
%T    topic length
%k    key
%K    key length
%v    value
%V    value length
%h    begin the header specification
%H    number of headers
%p    partition
%o    offset
%e    leader epoch
%d    timestamp
%x    producer id
%y    producer epoch

If using length / number verbs (i.e., "sized" verbs), they must occur before what they are sizing.

There are three escapes to parse raw characters, rather than opting into some formatting option:

%%    percent sign
%{    left brace
%}    right brace

Unlike record formatting, timestamps can only be read as numbers because Go or strftime formatting can both be variable length and do not play too well with delimiters. Timestamps numbers are read as milliseconds.

Numbers

All size numbers can be parsed in the following ways:

%v{ascii}       parse numeric digits until a non-numeric
%v{number}      alias for ascii

%v{hex64}       read 16 hex characters for the number
%v{hex32}       read 8 hex characters for the number
%v{hex16}       read 4 hex characters for the number
%v{hex8}        read 2 hex characters for the number
%v{hex4}        read 1 hex characters for the number

%v{big64}       read the number as big endian uint64 format
%v{big32}       read the number as big endian uint32 format
%v{big16}       read the number as big endian uint16 format
%v{big8}        alias for byte

%v{little64}    read the number as little endian uint64 format
%v{little32}    read the number as little endian uint32 format
%v{little16}    read the number as little endian uint16 format
%v{little8}     read the number as a byte

%v{byte}        read the number as a byte
%v{bool}        read "true" as 1, "false" as 0
%v{3}           read 3 characters (any number)

Similar to number formatting, headers are parsed using a nested primitive format option, accepting the key and value escapes previously mentioned.

Text

Topics, keys, and values can be decoded using "base64", "hex", and "json" formatting options. Any size specification is the size of the encoded value actually being read (i.e., size as seen, not size when decoded). JSON values are compacted after being read.

%T%t{hex}     -  4abcd reads four hex characters "abcd"
%V%v{base64}  -  2z9 reads two base64 characters "z9"
%v{json} %k   -  {"foo" : "bar"} foo reads a JSON object and then "foo"

As well, these text options can be parsed with regular expressions:

%k{re[\d*]}%v{re[\s+]}

ReadRecord reads the next record in the reader and returns it, or returns a parsing error.

This will return io.EOF only if the underlying reader returns io.EOF at the start of a new record. If an io.EOF is returned mid record, this returns io.ErrUnexpectedEOF. It is expected for this function to be called until it returns io.EOF.

ReadRecordInto reads the next record into the given record and returns any parsing error

This will return io.EOF only if the underlying reader returns io.EOF at the start of a new record. If an io.EOF is returned mid record, this returns io.ErrUnexpectedEOF. It is expected for this function to be called until it returns io.EOF.

SetReader replaces the underlying reader with the given reader.

ResponseShard ties together a request with either the response it received or an error that prevented a response from being received.

type TopicBackupIter interface {
	
	
	
	Next() (int, int64)
	
	Rem() int
}

TopicBackupIter is an iterates through partition indices.

TopicBackupPartitioner is an optional extension interface to TopicPartitioner that can partition by the number of records buffered.

If a partitioner implements this interface, the Partition function will never be called.

TopicPartitioner partitions records in an individual topic.

type TopicPartitionerOnNewBatch interface {
	
	
	OnNewBatch()
}

TopicPartitionerOnNewBatch is an optional extension interface to TopicPartitioner that calls OnNewBatch before any new batch is created. If buffering a record would cause a new batch, OnNewBatch is called.

This interface allows for partitioner implementations that effectively pin to a partition until a new batch is created, after which the partitioner can choose which next partition to use.

type TransactionEndTry bool

TransactionEndTry is simply a named bool.


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4