A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://github.com/confluentinc/librdkafka/wiki/Manually-setting-the-consumer-start-offset below:

Manually setting the consumer start offset · confluentinc/librdkafka Wiki · GitHub

The high-level Kafka consumer (KafkaConsumer in C++) will start consuming at the last committed offset by default, if there is no previously committed offset for the topic+partition and group it will fall back on the topic configuration property auto.offset.reset which defaults to latest, thus starting to consume at the end of the partition (only new messages will be consumed).

To manually set the starting offset for a partition the assign() API allows you to specify the start offset for each partition by setting the .offset field in the topic_partition_t element to either an absolute offset (>=0) or one of the logical offsets (BEGINNING, END, STORED, TAIL(..)).

See following examples showing how to start consuming topic "mytopic" partition 3 at offset 1234.

From a rebalance_cb:

void my_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
                      rd_kafka_topic_partition_list_t *partitions, void *opaque) {
   if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
       rd_kafka_topic_partition_t *part;
       if ((part = rd_kafka_topic_partition_list_find(partitions, "mytopic", 3)))
           part->offset = 1234;
       rd_kafka_assign(rk, partitions);
   }  else {
       rd_kafka_assign(rk, NULL);
   }
}

Direct assign() with no subscription:

rd_kafka_topic_partition_list_t *partitions;
partitions = rd_kafka_topic_partition_list_new(0);
rd_kafka_topic_partition_list_add(partitions, "mytopic", 3)->offset = 1234;
rd_kafka_assign(rk, partitions);
rd_kafka_topic_partition_list_destroy(partitions);  

From a rebalance_cb:

class ExampleRebalanceCb : public RdKafka::RebalanceCb {
public:
  void rebalance_cb (RdKafka::KafkaConsumer *consumer,
		     RdKafka::ErrorCode err,
                     std::vector<RdKafka::TopicPartition*> &partitions) {
    if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
      RdKafka::TopicPartition *part;
      // find the partition, through std::find() or other means
      ...
      if (part)
         part->set_offset(1234);
      consumer->assign(partitions);
    } else {
      consumer->unassign();
    }
  }
};

Direct assign() with no subscription:

std::vector<RdKafka::TopicPartition*> partitions;
partitions.push_back(RdKafka::TopicPartition::create("mytopic", 3, 1234));
consumer->assign(partitions);

From https://github.com/confluentinc/confluent-kafka-python/issues/373#issuecomment-389095624:

def my_on_assign(consumer, partitions):
    for p in partitions:
         # some starting offset, or use OFFSET_BEGINNING, et, al.
         # the default offset is STORED which means use committed offsets, and if
         # no committed offsets are available use auto.offset.reset config (default latest)
        p.offset = 1234
    # call assign() to start fetching the given partitions.
    consumer.assign(partitions)

consumer.subscribe(mytopics, on_assign=my_on_assign)

RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4