2
2
* librdkafka - Apache Kafka C library
3
3
*
4
4
* Copyright (c) 2012-2022, Magnus Edenhill
5
+
* 2023 Confluent Inc.
5
6
* All rights reserved.
6
7
*
7
8
* Redistribution and use in source and binary forms, with or without
@@ -457,10 +458,12 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
457
458
{_RK_GLOBAL, "topic.metadata.refresh.fast.interval.ms", _RK_C_INT,
458
459
_RK(metadata_refresh_fast_interval_ms),
459
460
"When a topic loses its leader a new metadata request will be "
460
-
"enqueued with this initial interval, exponentially increasing "
461
+
"enqueued immediately and then with this initial interval, exponentially "
462
+
"increasing upto `retry.backoff.max.ms`, "
461
463
"until the topic metadata has been refreshed. "
464
+
"If not set explicitly, it will be defaulted to `retry.backoff.ms`. "
462
465
"This is used to recover quickly from transitioning leader brokers.",
463
-
1, 60 * 1000, 250},
466
+
1, 60 * 1000, 100},
464
467
{_RK_GLOBAL | _RK_DEPRECATED, "topic.metadata.refresh.fast.cnt", _RK_C_INT,
465
468
_RK(metadata_refresh_fast_cnt), "No longer used.", 0, 1000, 10},
466
469
{_RK_GLOBAL, "topic.metadata.refresh.sparse", _RK_C_BOOL,
@@ -1372,10 +1375,21 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
1372
1375
0, INT32_MAX, INT32_MAX},
1373
1376
{_RK_GLOBAL | _RK_PRODUCER, "retries", _RK_C_ALIAS,
1374
1377
.sdef = "message.send.max.retries"},
1378
+
1375
1379
{_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.ms", _RK_C_INT,
1376
1380
_RK(retry_backoff_ms),
1377
-
"The backoff time in milliseconds before retrying a protocol request.", 1,
1378
-
300 * 1000, 100},
1381
+
"The backoff time in milliseconds before retrying a protocol request, "
1382
+
"this is the first backoff time, "
1383
+
"and will be backed off exponentially until number of retries is "
1384
+
"exhausted, and it's capped by retry.backoff.max.ms.",
1385
+
1, 300 * 1000, 100},
1386
+
1387
+
{_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.max.ms", _RK_C_INT,
1388
+
_RK(retry_backoff_max_ms),
1389
+
"The max backoff time in milliseconds before retrying a protocol request, "
1390
+
"this is the atmost backoff allowed for exponentially backed off "
1391
+
"requests.",
1392
+
1, 300 * 1000, 1000},
1379
1393
1380
1394
{_RK_GLOBAL | _RK_PRODUCER, "queue.buffering.backpressure.threshold",
1381
1395
_RK_C_INT, _RK(queue_backpressure_thres),
@@ -3928,6 +3942,10 @@ const char *rd_kafka_conf_finalize(rd_kafka_type_t cltype,
3928
3942
conf->sparse_connect_intvl =
3929
3943
RD_MAX(11, RD_MIN(conf->reconnect_backoff_ms / 2, 1000));
3930
3944
}
3945
+
if (!rd_kafka_conf_is_modified(
3946
+
conf, "topic.metadata.refresh.fast.interval.ms"))
3947
+
conf->metadata_refresh_fast_interval_ms =
3948
+
conf->retry_backoff_ms;
3931
3949
3932
3950
if (!rd_kafka_conf_is_modified(conf, "connections.max.idle.ms") &&
3933
3951
conf->brokerlist && rd_strcasestr(conf->brokerlist, "azure")) {
@@ -4116,6 +4134,31 @@ int rd_kafka_conf_warn(rd_kafka_t *rk) {
4116
4134
"recommend not using set_default_topic_conf");
4117
4135
4118
4136
/* Additional warnings */
4137
+
if (rk->rk_conf.retry_backoff_ms > rk->rk_conf.retry_backoff_max_ms) {
4138
+
rd_kafka_log(
4139
+
rk, LOG_WARNING, "CONFWARN",
4140
+
"Configuration `retry.backoff.ms` with value %d is greater "
4141
+
"than configuration `retry.backoff.max.ms` with value %d. "
4142
+
"A static backoff with value `retry.backoff.max.ms` will "
4143
+
"be applied.",
4144
+
rk->rk_conf.retry_backoff_ms,
4145
+
rk->rk_conf.retry_backoff_max_ms);
4146
+
}
4147
+
4148
+
if (rd_kafka_conf_is_modified(
4149
+
&rk->rk_conf, "topic.metadata.refresh.fast.interval.ms") &&
4150
+
rk->rk_conf.metadata_refresh_fast_interval_ms >
4151
+
rk->rk_conf.retry_backoff_max_ms) {
4152
+
rd_kafka_log(
4153
+
rk, LOG_WARNING, "CONFWARN",
4154
+
"Configuration `topic.metadata.refresh.fast.interval.ms` "
4155
+
"with value %d is greater than configuration "
4156
+
"`retry.backoff.max.ms` with value %d. "
4157
+
"A static backoff with value `retry.backoff.max.ms` will "
4158
+
"be applied.",
4159
+
rk->rk_conf.metadata_refresh_fast_interval_ms,
4160
+
rk->rk_conf.retry_backoff_max_ms);
4161
+
}
4119
4162
if (rk->rk_type == RD_KAFKA_CONSUMER) {
4120
4163
if (rk->rk_conf.fetch_wait_max_ms + 1000 >
4121
4164
rk->rk_conf.socket_timeout_ms)
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4