A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://github.com/confluentinc/librdkafka/issues/4589 below:

rd_kafka_metadata can cause a UNKNOWN_TOPIC_OR_PART message · Issue #4589 · confluentinc/librdkafka · GitHub

Description

Using a client with rd_kafka_queue_io_event_enable that is a consumer using rd_kafka_subscribe.

This works fine until metadata is requested via rd_kafka_metadata.

If the metadata is requested any time after making the subscription, the poll returns a message with error set to UNKNOWN_TOPIC_OR_PART (offset -1001) for every topic subscribed to (although the metadata returns correct results and the subscription continues getting messages).

If the metadata is requested prior to making any subscription, I dont get an error message from the poll.

How to reproduce

example client program (given topic exists called topic1 with partition 0, broker localhost:9092) purely to show the error message.

gcc main.c -lrdkafka -lz -lpthread -lssl

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <librdkafka/rdkafka.h>

int main(int argc,char** argv){
    rd_kafka_t *rk=0;
    rd_kafka_conf_t *conf=0;
    char *brokers = "localhost:9092";
    rd_kafka_queue_t *queue=0;
    rd_kafka_resp_err_t err=0;
    rd_kafka_topic_partition_list_t *t_partition;
    rd_kafka_message_t *msg;
    char errstr[512];
    int spair[2];

    if(pipe(spair)==-1){
        fprintf(stderr, "pipe fail\n");
        exit(1);
    }

    conf = rd_kafka_conf_new();
    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%% %s\n", errstr);
        exit(1);
    }

    if (rd_kafka_conf_set(conf, "group.id","0", errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%% %s\n", errstr);
        exit(1);
    }

    if (rd_kafka_conf_set(conf, "debug","all", errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%% %s\n", errstr);
        exit(1);
    }

    if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,sizeof(errstr)))) {
      fprintf(stderr,"Failed to create new consumer: %s\n",errstr);
      exit(1);
    }

    err=rd_kafka_poll_set_consumer(rk);
    if(RD_KAFKA_RESP_ERR_NO_ERROR != err){
        fprintf(stderr,"rd_kafka_poll_set_consumer err %s\n",rd_kafka_err2str(err));
        exit(1);
    }
    queue = rd_kafka_queue_get_consumer(rk);
    if (!queue){
        fprintf(stderr, "queue fail\n");
        exit(1);
    }
    rd_kafka_queue_io_event_enable(queue,spair[1],"X",1);
    rd_kafka_set_log_queue(rk,NULL);

    ///////// subscribe
    printf("Subscribing...\n");
    t_partition = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(t_partition,"test1",RD_KAFKA_PARTITION_UA);
    if(RD_KAFKA_RESP_ERR_NO_ERROR != (err= rd_kafka_subscribe(rk, t_partition))){
        fprintf(stderr,"rd_kafka_subscribe err %s\n",rd_kafka_err2str(err));
        exit(1);
    }
    rd_kafka_topic_partition_list_destroy(t_partition);

    //////// metadata (comment out this section to prevent error)
    {
        printf("Get metadata...\n");
        const struct rd_kafka_metadata *metadata;
        err = rd_kafka_metadata(rk, 1, 0, &metadata,5000);
          if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
             fprintf(stderr,"Failed to acquire metadata: %s\n",rd_kafka_err2str(err));
             exit(1);
        }
        rd_kafka_metadata_destroy(metadata);
    }

    ///////// poll
    printf("Poll...\n");
    while((msg= rd_kafka_consumer_poll(rk, 10000))) {
        if (msg->err)
            fprintf(stderr,"Got error msg %d (%s) [topic %s offset %lld]\n",msg->err,rd_kafka_err2name(msg->err),rd_kafka_topic_name(msg->rkt),msg->offset);
        else
            fprintf(stderr,"Got msg [topic %s offset %lld]\n",rd_kafka_topic_name(msg->rkt),msg->offset);
        rd_kafka_message_destroy(msg);
    }

    printf("Finished...\n");

    return 1;
}

outputs

Subscribing...
Get metadata...
Poll...
Got error msg 3 (UNKNOWN_TOPIC_OR_PART) [topic test1 offset -1001]
Finished...

if you edit the code to put the metadata request before the subscription, the error msg no longer appears.
The debug log gets a TOPICERR entry only when metadata requested after subscription.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4