A complete Apache Kafka client written in Go
Franz-go is an all-encompassing Apache Kafka client fully written Go. This library aims to provide every Kafka feature from Apache Kafka v0.8.0 onward. It has support for transactions, regex topic consuming, the latest partitioning strategies, data loss detection, closest replica fetching, and more. If a client KIP exists, this library aims to support it.
This library attempts to provide an intuitive API while interacting with Kafka the way Kafka expects (timeouts, etc.).
Request
functionkgo.ProducerBatchCompression(kgo.NoCompression)
.This repo contains multiple tags to allow separate features to be developed and released independently. The main client is in franz-go. Plugins are released from plugin/{plugin}
. The raw-protocol package is released from pkg/kmsg
, and the admin package is released from pkg/kadm
.
The main client is located in the package github.com/twmb/franz-go/pkg/kgo
, while the root of the project is at github.com/twmb/franz-go
. There are a few extra packages within the project, as well as a few sub-modules. To use the main kgo package,
go get github.com/twmb/franz-go
To use a plugin,
go get github.com/twmb/franz-go/plugin/kzap
To use kadm,
go get github.com/twmb/franz-go/pkg/kadm
As an example, your require section in go.mod may look like this:
require (
github.com/twmb/franz-go v1.12.0
github.com/twmb/franz-go/pkg/kmsg v1.4.0
)
Here's a basic overview of producing and consuming:
seeds := []string{"localhost:9092"} // One client can both produce and consume! // Consuming can either be direct (no consumer group), or through a group. Below, we use a group. cl, err := kgo.NewClient( kgo.SeedBrokers(seeds...), kgo.ConsumerGroup("my-group-identifier"), kgo.ConsumeTopics("foo"), ) if err != nil { panic(err) } defer cl.Close() ctx := context.Background() // 1.) Producing a message // All record production goes through Produce, and the callback can be used // to allow for synchronous or asynchronous production. var wg sync.WaitGroup wg.Add(1) record := &kgo.Record{Topic: "foo", Value: []byte("bar")} cl.Produce(ctx, record, func(_ *kgo.Record, err error) { defer wg.Done() if err != nil { fmt.Printf("record had a produce error: %v\n", err) } }) wg.Wait() // Alternatively, ProduceSync exists to synchronously produce a batch of records. if err := cl.ProduceSync(ctx, record).FirstErr(); err != nil { fmt.Printf("record had a produce error while synchronously producing: %v\n", err) } // 2.) Consuming messages from a topic for { fetches := cl.PollFetches(ctx) if errs := fetches.Errors(); len(errs) > 0 { // All errors are retried internally when fetching, but non-retriable errors are // returned from polls so that users can notice and take action. panic(fmt.Sprint(errs)) } // We can iterate through a record iterator... iter := fetches.RecordIter() for !iter.Done() { record := iter.Next() fmt.Println(string(record.Value), "from an iterator!") } // or a callback function. fetches.EachPartition(func(p kgo.FetchTopicPartition) { for _, record := range p.Records { fmt.Println(string(record.Value), "from range inside a callback!") } // We can even use a second callback! p.EachRecord(func(record *kgo.Record) { fmt.Println(string(record.Value), "from a second callback!") }) }) }
This only shows producing and consuming in the most basic sense, and does not show the full list of options to customize how the client runs, nor does it show transactional producing / consuming. Check out the examples directory for more!
API reference documentation can be found on . Supplementary information can be found in the docs directory:
docs ├── admin requests — an overview of how to issue admin requests ├── metrics and logging — a small writeup on how to enable metrics & logging in franz-go, as well as a few thoughts on latency tracking ├── package layout — describes the packages in franz-go ├── producing and consuming — descriptions of producing & consuming & the guarantees └── transactions — a description of transactions and the safety even in a pre-KIP-447 world
In alphabetical order,
If you use this library and want on the list above, please either open a PR or comment on #142!
By default, the client issues an ApiVersions request on connect to brokers and defaults to using the maximum supported version for requests that each broker supports. If you want to pin to an exact version, you can use the MaxVersions
option.
Kafka 0.10.0 introduced the ApiVersions request; if you are working with brokers older than that, you must use the kversions package. Use the MaxVersions option for the client if you do so.
Note there exists plug-in packages that allow you to easily add prometheus metrics, go-metrics, zap logging, etc. to your client! See the plugin directory for more information! These plugins are provided under dedicated modules, e.g. github.com/twmb/franz-go/plugin/kprom@v1.0.0
.
The franz-go client takes a neutral approach to metrics by providing hooks that you can use to plug in your own metrics.
All connections, disconnections, reads, writes, and throttles can be hooked into, as well as per-batch produce & consume metrics. If there is an aspect of the library that you wish you could have insight into, please open an issue and we can discuss adding another hook.
Hooks allow you to log in the event of specific errors, or to trace latencies, count bytes, etc., all with your favorite monitoring systems.
In addition to hooks, logging can be plugged in with a general Logger
interface. A basic logger is provided if you just want to write to a given file in a simple format. All logs have a message and then key/value pairs of supplementary information. It is recommended to always use a logger and to use LogLevelInfo
.
See this example for an expansive example of integrating with prometheus! Alternatively, see this example for how to use the plug-in prometheus package!
This client is quite fast; it is the fastest and most cpu and memory efficient client in Go.
For 100 byte messages,
This client is 4x faster at producing than confluent-kafka-go, and up to 10x-20x faster (at the expense of more memory usage) at consuming.
This client is 2.5x faster at producing than sarama, and 1.5x faster at consuming.
This client is 2.4x faster at producing than segment's kafka-go, and anywhere from 2x to 6x faster at consuming.
To check benchmarks yourself, see the bench example. This example lets you produce or consume to a cluster and see the byte / record rate. The compare subdirectory shows comparison code.
Theoretically, this library supports every (non-Java-specific) client facing KIP. Any KIP that simply adds or modifies a protocol is supported by code generation.
KIP Kafka release Status KIP-1 — Disallow acks > 1 0.8.3 Supported & Enforced KIP-4 — Request protocol changes 0.9.0 through 0.10.1 Supported KIP-8 — Flush method on Producer 0.8.3 Supported KIP-12 — SASL & SSL 0.9.0 Supported KIP-13 — Throttling (on broker) 0.9.0 Supported KIP-15 — Close with a timeout 0.9.0 Supported (via context) KIP-19 — Request timeouts 0.9.0 Supported KIP-22 — Custom partitioners 0.9.0 Supported KIP-31 — Relative offsets in message sets 0.10.0 Supported KIP-32 — Timestamps in message set v1 0.10.0 Supported KIP-35 — ApiVersion 0.10.0 Supported KIP-40 — ListGroups and DescribeGroups 0.9.0 Supported KIP-41 — max.poll.records 0.10.0 Supported (via PollRecords) KIP-42 — Producer & consumer interceptors 0.10.0 Supported via hooks KIP-43 — SASL PLAIN & handshake 0.10.0 Supported KIP-48 — Delegation tokens 1.1 Supported KIP-54 — Sticky partitioning 0.11.0 Supported KIP-57 — Fix lz4 0.10.0 Supported KIP-62 — background heartbeats & improvements 0.10.1 Supported KIP-70 — On{Assigned,Revoked} 0.10.1 Supported KIP-74 — Fetch response size limits 0.10.1 Supported KIP-78 — ClusterID in Metadata 0.10.1 Supported KIP-79 — List offsets for times 0.10.1 Supported KIP-81 — Bound fetch memory usage WIP Supported (through options) KIP-82 — Record headers 0.11.0 Supported KIP-84 — SASL SCRAM 0.10.2 Supported KIP-86 — SASL Callbacks 0.10.2 Supported (through callback fns) KIP-88 — OffsetFetch for admins 0.10.2 Supported KIP-91 — Intuitive producer timeouts 2.1 Supported (as a matter of opinion) KIP-97 — Backwards compat for old brokers 0.10.2 Supported KIP-98 — EOS 0.11.0 Supported KIP-101 — OffsetForLeaderEpoch v0 0.11.0 Supported KIP-102 — Consumer close timeouts 0.10.2 Supported (via context) KIP-107 — DeleteRecords 0.11.0 Supported KIP-108 — CreateTopic validate only field 0.10.2 Supported KIP-110 — zstd 2.1 Supported KIP-112 — Broker request protocol changes 1.0 Supported KIP-113 — LogDir requests 1.0 Supported KIP-117 — Admin client 0.11.0 Supported KIP-124 — Request rate quotas 0.11.0 Supported KIP-126 — Ensure proper batch size after compression 0.11.0 Supported (avoided entirely) KIP-133 — Describe & Alter configs 0.11.0 Supported KIP-140 — ACLs 0.11.0 Supported KIP-144 — Broker reconnect backoff 0.11.0 Supported KIP-152 — More SASL; SASLAuthenticate 1.0 Supported KIP-183 — Elect preferred leaders 2.2 Supported KIP-185 — Idempotency is default 1.0 Supported KIP-192 — Cleaner idempotence semantics 1.0 Supported KIP-195 — CreatePartitions 1.0 Supported KIP-204 — DeleteRecords via admin API 1.1 Supported KIP-207 — New error in ListOffsets 2.2 Supported KIP-219 — Client-side throttling 2.0 Supported KIP-222 — Group operations via admin API 2.0 Supported KIP-226 — Describe configs v1 1.1 Supported KIP-227 — Incremental fetch 1.1 Supported KIP-229 — DeleteGroups 1.1 Supported KIP-249 — Delegation tokens in admin API 2.0 Supported KIP-255 — SASL OAUTHBEARER 2.0 Supported KIP-266 — Fix indefinite consumer timeouts 2.0 Supported (via context) KIP-279 — OffsetForLeaderEpoch bump 2.0 Supported KIP-289 — Default group.id to null 2.2 Supported KIP-294 — TLS verification 2.0 Supported (via dialer) KIP-302 — Use multiple addrs for resolved hostnames 2.1 Supported (via dialer) KIP-320 — Fetcher: detect log truncation 2.1 Supported KIP-322 — DeleteTopics disabled error code 2.1 Supported KIP-339 — IncrementalAlterConfigs 2.3 Supported KIP-341 — Sticky group bugfix ? Supported KIP-342 — OAUTHBEARER extensions 2.1 Supported KIP-345 — Static group membership 2.4 Supported KIP-357 — List ACLs per principal via admin API 2.1 Supported KIP-360 — Safe epoch bumping forUNKNOWN_PRODUCER_ID
2.5 Supported KIP-361 — Allow disable auto topic creation 2.3 Supported KIP-368 — Periodically reauthenticate SASL 2.2 Supported KIP-369 — An always round robin produce partitioner 2.4 Supported KIP-373 — Users can create delegation tokens for others 3.3 Supported KIP-380 — Inter-broker protocol changes 2.2 Supported KIP-389 — Group max size error 2.2 Supported KIP-392 — Closest replica fetching w/ rack 2.2 Supported KIP-394 — Require member.id for initial join request 2.2 Supported KIP-396 — Commit offsets manually 2.4 Supported KIP-405 — Kafka Tiered Storage 3.5 Supported (protos) KIP-412 — Dynamic log levels w/ IncrementalAlterConfigs 2.4 Supported KIP-429 — Incremental rebalance (see KAFKA-8179) 2.4 Supported KIP-430 — Authorized ops in DescribeGroups 2.3 Supported KIP-447 — Producer scalability for EOS 2.5 Supported KIP-455 — Replica reassignment API 2.4 Supported KIP-460 — Leader election API 2.4 Supported KIP-464 — CreateTopic defaults 2.4 Supported KIP-467 — Per-record error codes when producing 2.4 Supported (and ignored) KIP-480 — Sticky partition producing 2.4 Supported KIP-482 — Tagged fields (KAFKA-8885) 2.4 Supported KIP-496 — OffsetDelete admin command 2.4 Supported KIP-497 — New AlterISR API 2.7 Supported KIP-498 — Max bound on reads ? Supported KIP-511 — Client name/version in ApiVersions request 2.4 Supported KIP-514 — Bounded Flush 2.4 Supported (via context) KIP-516 — Topic IDs 2.8 Supported KIP-518 — List groups by state 2.6 Supported KIP-519 — Configurable SSL "engine" 2.6 Supported (via dialer) KIP-525 — CreateTopics v5 returns configs 2.4 Supported KIP-526 — Reduce metadata lookups 2.5 Supported KIP-533 — Default API timeout (total time, not per request) 2.5 Supported (via RetryTimeout) KIP-546 — Client Quota APIs 2.5 Supported KIP-554 — Broker side SCRAM APIs 2.7 Supported KIP-559 — Protocol info in sync/join 2.5 Supported KIP-568 — Explicit rebalance triggering on the consumer 2.6 Supported KIP-569 — Docs & type in DescribeConfigs 2.6 Supported KIP-570 — Leader epoch in StopReplica 2.6 Supported KIP-580 — Exponential backoff 2.6 Supported KIP-584 — Versioning scheme for features 2.6 Supported KIP-588 — Producer recovery from txn timeout 2.7 Supported KIP-590 — Envelope (broker only) 2.7 Supported KIP-595 — New APIs for raft protocol 2.7 Supported KIP-599 — Throttling on create/delete topic/partition 2.7 Supported KIP-602 — Use all resolved addrs by default 2.6 Supported (via dialer) KIP-651 — Support PEM 2.7 Supported (via dialer) KIP-654 — Aborted txns with unflushed data is not fatal 2.7 Supported (default behavior) KIP-664 — Describe producers / etc. 2.8 (mostly) Supported KIP-679 — Strongest producer guarantee by default 3.0 Supported (by default always) KIP-699 — Batch FindCoordinators 3.0 Supported KIP-700 — DescribeCluster 2.8 Supported KIP-704 — AlterISR => AlterPartition 3.2 Supported KIP-709 — Batch OffsetFetch 3.0 Supported KIP-714 - Client Metrics 3.7 Supported KIP-730 - AllocateProducerIDs 3.0 Supported KIP-734 — Support MaxTimestamp in ListOffsets 3.0 Supported (simple version bump) KIP-735 — Bump default session timeout ? Supported KIP-778 — KRaft Upgrades (protocol changes only) 3.2 Supported KIP-784 — Add ErrorCode to DescribeLogDirs response 3.1 Supported KIP-792 — Generation field in consumer group protocol 3.4 Supported KIP-794 — Better sticky partitioning 3.3 Supported (UniformBytesPartitioner
) KIP-800 — Reason in Join/Leave group 3.1 Supported KIP-814 — SkipAssignment for static group leaders 3.1 Supported KIP-827 — DescribeLogDirs.{Total,Usable}Bytes
3.3 Supported KIP-836 — DescribeQuorum
voter lag info 3.3 Supported KIP-841 — AlterPartition.TopicID
3.3 Supported KIP-848 — Next gen consumer rebalance protocol 3.7 Supported & hidden KIP-853 — Add replica directory ID for replica fetchers 3.9 Supported KIP-858 — JBOD in KRaft (protocol) 3.7 Supported KIP-866 — ZK to Raft RPC changes 3.4 Supported KIP-890 — Transactions server side defense 3.8, 4.0 Supported KIP-893 — Nullable structs in the protocol 3.5 Supported KIP-899 — Allow clients to rebootstrap ? Supported (UpdateSeedBrokers
) KIP-903 — Stale broker epoch fencing 3.5 Supported (proto) KIP-919 — Admin client to KRaft, Controller registration 3.7 Supported KIP-951 — Leader discovery optimizations 3.7 Supported KIP-966 — Eligible leader replicas (protocol) 3.7 Supported KIP-994 — List/Describe transactions enhancements 3.8 (partial) Supported KIP-1000 — ListClientMetricsResources 3.7 Supported KIP-1005 — ListOffsets w. Timestamp -5 3.9 Supported KIP-1022 - Formatting changes for features 4.0 Supported KIP-1043 — Administration of groups (protocol) 4.0 Supported KIP-1073 — DescribeCluster.IsFenced 4.0 Supported KIP-1075 — TimeoutMillis on ListOffsets 4.0 Supported KIP-1076 — User provided client metrics 4.0 Supported KIP-1102 — RebootstrapRequired 4.0 Supported KIP-1106 - Reset offset by duration 4.0 Skipped, unneeded
Missing from above but included in librdkafka is:
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4