Due to librdkafka's async nature, it is not trivial to implement a synchronic produce interface in the library that can be used in parallel to the standard async interface.
The best solution at this time is to provide a sync interface in your application, which fortunately is a trivial thing to do.
Note: The code has data race, this is just a sample.
This function produces a message, provides a local stack variable as the message opaque, and waits for the stack variable to change value by calling rd_kafka_poll()
that in turn will call the delivery report callback sync_produce_dr_cb()
.
rd_kafka_resp_err_t sync_produce (rd_kafka_topic_t *rkt, int32_t partition, void *payload, size_t len, const void *key, size_t keylen) { rd_kafka_resp_err_t err = -12345; if (rd_kafka_produce(rkt, partition, 0, payload, len, key, keylen, &err) == -1) return rd_kafka_errno2err(errno); while (err == -12345) rd_kafka_poll(rk, 1000); return err; }
Simply stores the result of the produce request in the provided errp
pointer, this errp
pointer is pointing to the err
variable in msg_delivered()
.
static void msg_delivered (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { if (rkmessage->_private) { rd_kafka_resp_err_t *errp = (rd_kafka_resp_err_t *)rkmessage->_private; *errp = rkmessage->err; } }
Application's initialization code for librdkafka must set up the delivery report callback.
void my_init_code (void) { char errstr[512]; rd_kafka_conf_t *rk_conf = rd_kafka_conf_new(); /* Set delivery report callback */ rd_kafka_conf_set_dr_msg_cb(rk_conf, msg_delivered); /* Minimize wait-for-larger-batch delay (since there will be no batching) */ rd_kafka_conf_set(rk_conf, "queue.buffering.max.ms", "1", errstr, sizeof(errstr)); /* Minimize wait-for-socket delay (otherwise you will lose 100ms per message instead just the RTT) */ rd_kafka_conf_set(rk_conf, "socket.blocking.max.ms", "1", errstr, sizeof(errstr)); rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf, errstr, sizeof(errstr)); if (!rk) ERROR(errstr); /* create topics, etc.. */ }
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4