Using a client with rd_kafka_queue_io_event_enable
that is a consumer using rd_kafka_subscribe
.
This works fine until metadata is requested via rd_kafka_metadata
.
If the metadata is requested any time after making the subscription, the poll returns a message with error set to UNKNOWN_TOPIC_OR_PART (offset -1001) for every topic subscribed to (although the metadata returns correct results and the subscription continues getting messages).
If the metadata is requested prior to making any subscription, I dont get an error message from the poll.
How to reproduceexample client program (given topic exists called topic1
with partition 0
, broker localhost:9092
) purely to show the error message.
gcc main.c -lrdkafka -lz -lpthread -lssl
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <librdkafka/rdkafka.h> int main(int argc,char** argv){ rd_kafka_t *rk=0; rd_kafka_conf_t *conf=0; char *brokers = "localhost:9092"; rd_kafka_queue_t *queue=0; rd_kafka_resp_err_t err=0; rd_kafka_topic_partition_list_t *t_partition; rd_kafka_message_t *msg; char errstr[512]; int spair[2]; if(pipe(spair)==-1){ fprintf(stderr, "pipe fail\n"); exit(1); } conf = rd_kafka_conf_new(); if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); exit(1); } if (rd_kafka_conf_set(conf, "group.id","0", errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); exit(1); } if (rd_kafka_conf_set(conf, "debug","all", errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); exit(1); } if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,sizeof(errstr)))) { fprintf(stderr,"Failed to create new consumer: %s\n",errstr); exit(1); } err=rd_kafka_poll_set_consumer(rk); if(RD_KAFKA_RESP_ERR_NO_ERROR != err){ fprintf(stderr,"rd_kafka_poll_set_consumer err %s\n",rd_kafka_err2str(err)); exit(1); } queue = rd_kafka_queue_get_consumer(rk); if (!queue){ fprintf(stderr, "queue fail\n"); exit(1); } rd_kafka_queue_io_event_enable(queue,spair[1],"X",1); rd_kafka_set_log_queue(rk,NULL); ///////// subscribe printf("Subscribing...\n"); t_partition = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(t_partition,"test1",RD_KAFKA_PARTITION_UA); if(RD_KAFKA_RESP_ERR_NO_ERROR != (err= rd_kafka_subscribe(rk, t_partition))){ fprintf(stderr,"rd_kafka_subscribe err %s\n",rd_kafka_err2str(err)); exit(1); } rd_kafka_topic_partition_list_destroy(t_partition); //////// metadata (comment out this section to prevent error) { printf("Get metadata...\n"); const struct rd_kafka_metadata *metadata; err = rd_kafka_metadata(rk, 1, 0, &metadata,5000); if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { fprintf(stderr,"Failed to acquire metadata: %s\n",rd_kafka_err2str(err)); exit(1); } rd_kafka_metadata_destroy(metadata); } ///////// poll printf("Poll...\n"); while((msg= rd_kafka_consumer_poll(rk, 10000))) { if (msg->err) fprintf(stderr,"Got error msg %d (%s) [topic %s offset %lld]\n",msg->err,rd_kafka_err2name(msg->err),rd_kafka_topic_name(msg->rkt),msg->offset); else fprintf(stderr,"Got msg [topic %s offset %lld]\n",rd_kafka_topic_name(msg->rkt),msg->offset); rd_kafka_message_destroy(msg); } printf("Finished...\n"); return 1; }
outputs
Subscribing...
Get metadata...
Poll...
Got error msg 3 (UNKNOWN_TOPIC_OR_PART) [topic test1 offset -1001]
Finished...
if you edit the code to put the metadata request before the subscription, the error msg no longer appears.
The debug log gets a TOPICERR
entry only when metadata requested after subscription.
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
2.3.0
7.0.1
see example above
OSX(x64)>
debug=..
as necessary) from librdkafkaRetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4