The high-level Kafka consumer (KafkaConsumer in C++) will start consuming at the last committed offset by default, if there is no previously committed offset for the topic+partition and group it will fall back on the topic configuration property auto.offset.reset
which defaults to latest
, thus starting to consume at the end of the partition (only new messages will be consumed).
To manually set the starting offset for a partition the assign()
API allows you to specify the start offset for each partition by setting the .offset
field in the topic_partition_t
element to either an absolute offset (>=0) or one of the logical offsets (BEGINNING, END, STORED, TAIL(..)).
See following examples showing how to start consuming topic "mytopic" partition 3 at offset 1234.
From a rebalance_cb
:
void my_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque) { if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { rd_kafka_topic_partition_t *part; if ((part = rd_kafka_topic_partition_list_find(partitions, "mytopic", 3))) part->offset = 1234; rd_kafka_assign(rk, partitions); } else { rd_kafka_assign(rk, NULL); } }
Direct assign()
with no subscription:
rd_kafka_topic_partition_list_t *partitions; partitions = rd_kafka_topic_partition_list_new(0); rd_kafka_topic_partition_list_add(partitions, "mytopic", 3)->offset = 1234; rd_kafka_assign(rk, partitions); rd_kafka_topic_partition_list_destroy(partitions);
From a rebalance_cb
:
class ExampleRebalanceCb : public RdKafka::RebalanceCb { public: void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) { if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { RdKafka::TopicPartition *part; // find the partition, through std::find() or other means ... if (part) part->set_offset(1234); consumer->assign(partitions); } else { consumer->unassign(); } } };
Direct assign()
with no subscription:
std::vector<RdKafka::TopicPartition*> partitions; partitions.push_back(RdKafka::TopicPartition::create("mytopic", 3, 1234)); consumer->assign(partitions);
From https://github.com/confluentinc/confluent-kafka-python/issues/373#issuecomment-389095624:
def my_on_assign(consumer, partitions): for p in partitions: # some starting offset, or use OFFSET_BEGINNING, et, al. # the default offset is STORED which means use committed offsets, and if # no committed offsets are available use auto.offset.reset config (default latest) p.offset = 1234 # call assign() to start fetching the given partitions. consumer.assign(partitions) consumer.subscribe(mytopics, on_assign=my_on_assign)
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4