A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://github.com/confluentinc/librdkafka/commit/6584ed7c8b00786121c07bc0df5b3d7fa8da2661 below:

Fix for an undesired partition migration with stale leader epoch (#4680) · confluentinc/librdkafka@6584ed7 · GitHub

@@ -469,7 +469,39 @@ rd_kafka_mock_partition_assign_replicas(rd_kafka_mock_partition_t *mpart,

469 469

mpart, mpart->replicas[rd_jitter(0, replica_cnt - 1)]);

470 470

}

471 471 472 +

/**

473 +

* @brief Push a partition leader response to passed \p mpart .

474 +

*/

475 +

static void

476 +

rd_kafka_mock_partition_push_leader_response0(rd_kafka_mock_partition_t *mpart,

477 +

int32_t leader_id,

478 +

int32_t leader_epoch) {

479 +

rd_kafka_mock_partition_leader_t *leader_response;

480 + 481 +

leader_response = rd_calloc(1, sizeof(*leader_response));

482 +

leader_response->leader_id = leader_id;

483 +

leader_response->leader_epoch = leader_epoch;

484 +

TAILQ_INSERT_TAIL(&mpart->leader_responses, leader_response, link);

485 +

}

472 486 487 +

/**

488 +

* @brief Return the first mocked partition leader response in \p mpart ,

489 +

* if available.

490 +

*/

491 +

rd_kafka_mock_partition_leader_t *

492 +

rd_kafka_mock_partition_next_leader_response(rd_kafka_mock_partition_t *mpart) {

493 +

return TAILQ_FIRST(&mpart->leader_responses);

494 +

}

495 + 496 +

/**

497 +

* @brief Unlink and destroy a partition leader response

498 +

*/

499 +

void rd_kafka_mock_partition_leader_destroy(

500 +

rd_kafka_mock_partition_t *mpart,

501 +

rd_kafka_mock_partition_leader_t *mpart_leader) {

502 +

TAILQ_REMOVE(&mpart->leader_responses, mpart_leader, link);

503 +

rd_free(mpart_leader);

504 +

}

473 505 474 506

/**

475 507

* @brief Unlink and destroy committed offset

@@ -546,13 +578,18 @@ rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,

546 578

static void rd_kafka_mock_partition_destroy(rd_kafka_mock_partition_t *mpart) {

547 579

rd_kafka_mock_msgset_t *mset, *tmp;

548 580

rd_kafka_mock_committed_offset_t *coff, *tmpcoff;

581 +

rd_kafka_mock_partition_leader_t *mpart_leader, *tmp_mpart_leader;

549 582 550 583

TAILQ_FOREACH_SAFE(mset, &mpart->msgsets, link, tmp)

551 584

rd_kafka_mock_msgset_destroy(mpart, mset);

552 585 553 586

TAILQ_FOREACH_SAFE(coff, &mpart->committed_offsets, link, tmpcoff)

554 587

rd_kafka_mock_committed_offset_destroy(mpart, coff);

555 588 589 +

TAILQ_FOREACH_SAFE(mpart_leader, &mpart->leader_responses, link,

590 +

tmp_mpart_leader)

591 +

rd_kafka_mock_partition_leader_destroy(mpart, mpart_leader);

592 + 556 593

rd_list_destroy(&mpart->pidstates);

557 594 558 595

rd_free(mpart->replicas);

@@ -579,6 +616,7 @@ static void rd_kafka_mock_partition_init(rd_kafka_mock_topic_t *mtopic,

579 616

mpart->update_follower_end_offset = rd_true;

580 617 581 618

TAILQ_INIT(&mpart->committed_offsets);

619 +

TAILQ_INIT(&mpart->leader_responses);

582 620 583 621

rd_list_init(&mpart->pidstates, 0, rd_free);

584 622

@@ -2096,6 +2134,23 @@ rd_kafka_mock_partition_set_follower_wmarks(rd_kafka_mock_cluster_t *mcluster,

2096 2134

rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));

2097 2135

}

2098 2136 2137 +

rd_kafka_resp_err_t

2138 +

rd_kafka_mock_partition_push_leader_response(rd_kafka_mock_cluster_t *mcluster,

2139 +

const char *topic,

2140 +

int partition,

2141 +

int32_t leader_id,

2142 +

int32_t leader_epoch) {

2143 +

rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK);

2144 +

rko->rko_u.mock.name = rd_strdup(topic);

2145 +

rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_PART_PUSH_LEADER_RESPONSE;

2146 +

rko->rko_u.mock.partition = partition;

2147 +

rko->rko_u.mock.leader_id = leader_id;

2148 +

rko->rko_u.mock.leader_epoch = leader_epoch;

2149 + 2150 +

return rd_kafka_op_err_destroy(

2151 +

rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));

2152 +

}

2153 + 2099 2154

rd_kafka_resp_err_t

2100 2155

rd_kafka_mock_broker_set_down(rd_kafka_mock_cluster_t *mcluster,

2101 2156

int32_t broker_id) {

@@ -2379,6 +2434,23 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster,

2379 2434

mpart->update_follower_end_offset = rd_false;

2380 2435

}

2381 2436

break;

2437 +

case RD_KAFKA_MOCK_CMD_PART_PUSH_LEADER_RESPONSE:

2438 +

mpart = rd_kafka_mock_partition_get(

2439 +

mcluster, rko->rko_u.mock.name, rko->rko_u.mock.partition);

2440 +

if (!mpart)

2441 +

return RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;

2442 + 2443 +

rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",

2444 +

"Push %s [%" PRId32 "] leader response: (%" PRId32

2445 +

", %" PRId32 ")",

2446 +

rko->rko_u.mock.name, rko->rko_u.mock.partition,

2447 +

rko->rko_u.mock.leader_id,

2448 +

rko->rko_u.mock.leader_epoch);

2449 + 2450 +

rd_kafka_mock_partition_push_leader_response0(

2451 +

mpart, rko->rko_u.mock.leader_id,

2452 +

rko->rko_u.mock.leader_epoch);

2453 +

break;

2382 2454 2383 2455

/* Broker commands */

2384 2456

case RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN:


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4