A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://github.com/confluentinc/librdkafka/commit/6dc7c71e6a4f1895fa4a95b25769840702cec79d below:

[KIP-580] Exponential Backoff with Mock Broker Changes to Automate Te… · confluentinc/librdkafka@6dc7c71 · GitHub

2 2

* librdkafka - Apache Kafka C library

3 3

*

4 4

* Copyright (c) 2012-2022, Magnus Edenhill

5 +

* 2023 Confluent Inc.

5 6

* All rights reserved.

6 7

*

7 8

* Redistribution and use in source and binary forms, with or without

@@ -457,10 +458,12 @@ static const struct rd_kafka_property rd_kafka_properties[] = {

457 458

{_RK_GLOBAL, "topic.metadata.refresh.fast.interval.ms", _RK_C_INT,

458 459

_RK(metadata_refresh_fast_interval_ms),

459 460

"When a topic loses its leader a new metadata request will be "

460 -

"enqueued with this initial interval, exponentially increasing "

461 +

"enqueued immediately and then with this initial interval, exponentially "

462 +

"increasing upto `retry.backoff.max.ms`, "

461 463

"until the topic metadata has been refreshed. "

464 +

"If not set explicitly, it will be defaulted to `retry.backoff.ms`. "

462 465

"This is used to recover quickly from transitioning leader brokers.",

463 -

1, 60 * 1000, 250},

466 +

1, 60 * 1000, 100},

464 467

{_RK_GLOBAL | _RK_DEPRECATED, "topic.metadata.refresh.fast.cnt", _RK_C_INT,

465 468

_RK(metadata_refresh_fast_cnt), "No longer used.", 0, 1000, 10},

466 469

{_RK_GLOBAL, "topic.metadata.refresh.sparse", _RK_C_BOOL,

@@ -1372,10 +1375,21 @@ static const struct rd_kafka_property rd_kafka_properties[] = {

1372 1375

0, INT32_MAX, INT32_MAX},

1373 1376

{_RK_GLOBAL | _RK_PRODUCER, "retries", _RK_C_ALIAS,

1374 1377

.sdef = "message.send.max.retries"},

1378 + 1375 1379

{_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.ms", _RK_C_INT,

1376 1380

_RK(retry_backoff_ms),

1377 -

"The backoff time in milliseconds before retrying a protocol request.", 1,

1378 -

300 * 1000, 100},

1381 +

"The backoff time in milliseconds before retrying a protocol request, "

1382 +

"this is the first backoff time, "

1383 +

"and will be backed off exponentially until number of retries is "

1384 +

"exhausted, and it's capped by retry.backoff.max.ms.",

1385 +

1, 300 * 1000, 100},

1386 + 1387 +

{_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.max.ms", _RK_C_INT,

1388 +

_RK(retry_backoff_max_ms),

1389 +

"The max backoff time in milliseconds before retrying a protocol request, "

1390 +

"this is the atmost backoff allowed for exponentially backed off "

1391 +

"requests.",

1392 +

1, 300 * 1000, 1000},

1379 1393 1380 1394

{_RK_GLOBAL | _RK_PRODUCER, "queue.buffering.backpressure.threshold",

1381 1395

_RK_C_INT, _RK(queue_backpressure_thres),

@@ -3928,6 +3942,10 @@ const char *rd_kafka_conf_finalize(rd_kafka_type_t cltype,

3928 3942

conf->sparse_connect_intvl =

3929 3943

RD_MAX(11, RD_MIN(conf->reconnect_backoff_ms / 2, 1000));

3930 3944

}

3945 +

if (!rd_kafka_conf_is_modified(

3946 +

conf, "topic.metadata.refresh.fast.interval.ms"))

3947 +

conf->metadata_refresh_fast_interval_ms =

3948 +

conf->retry_backoff_ms;

3931 3949 3932 3950

if (!rd_kafka_conf_is_modified(conf, "connections.max.idle.ms") &&

3933 3951

conf->brokerlist && rd_strcasestr(conf->brokerlist, "azure")) {

@@ -4116,6 +4134,31 @@ int rd_kafka_conf_warn(rd_kafka_t *rk) {

4116 4134

"recommend not using set_default_topic_conf");

4117 4135 4118 4136

/* Additional warnings */

4137 +

if (rk->rk_conf.retry_backoff_ms > rk->rk_conf.retry_backoff_max_ms) {

4138 +

rd_kafka_log(

4139 +

rk, LOG_WARNING, "CONFWARN",

4140 +

"Configuration `retry.backoff.ms` with value %d is greater "

4141 +

"than configuration `retry.backoff.max.ms` with value %d. "

4142 +

"A static backoff with value `retry.backoff.max.ms` will "

4143 +

"be applied.",

4144 +

rk->rk_conf.retry_backoff_ms,

4145 +

rk->rk_conf.retry_backoff_max_ms);

4146 +

}

4147 + 4148 +

if (rd_kafka_conf_is_modified(

4149 +

&rk->rk_conf, "topic.metadata.refresh.fast.interval.ms") &&

4150 +

rk->rk_conf.metadata_refresh_fast_interval_ms >

4151 +

rk->rk_conf.retry_backoff_max_ms) {

4152 +

rd_kafka_log(

4153 +

rk, LOG_WARNING, "CONFWARN",

4154 +

"Configuration `topic.metadata.refresh.fast.interval.ms` "

4155 +

"with value %d is greater than configuration "

4156 +

"`retry.backoff.max.ms` with value %d. "

4157 +

"A static backoff with value `retry.backoff.max.ms` will "

4158 +

"be applied.",

4159 +

rk->rk_conf.metadata_refresh_fast_interval_ms,

4160 +

rk->rk_conf.retry_backoff_max_ms);

4161 +

}

4119 4162

if (rk->rk_type == RD_KAFKA_CONSUMER) {

4120 4163

if (rk->rk_conf.fetch_wait_max_ms + 1000 >

4121 4164

rk->rk_conf.socket_timeout_ms)


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4