@@ -469,7 +469,39 @@ rd_kafka_mock_partition_assign_replicas(rd_kafka_mock_partition_t *mpart,
469
469
mpart, mpart->replicas[rd_jitter(0, replica_cnt - 1)]);
470
470
}
471
471
472
+
/**
473
+
* @brief Push a partition leader response to passed \p mpart .
474
+
*/
475
+
static void
476
+
rd_kafka_mock_partition_push_leader_response0(rd_kafka_mock_partition_t *mpart,
477
+
int32_t leader_id,
478
+
int32_t leader_epoch) {
479
+
rd_kafka_mock_partition_leader_t *leader_response;
480
+
481
+
leader_response = rd_calloc(1, sizeof(*leader_response));
482
+
leader_response->leader_id = leader_id;
483
+
leader_response->leader_epoch = leader_epoch;
484
+
TAILQ_INSERT_TAIL(&mpart->leader_responses, leader_response, link);
485
+
}
472
486
487
+
/**
488
+
* @brief Return the first mocked partition leader response in \p mpart ,
489
+
* if available.
490
+
*/
491
+
rd_kafka_mock_partition_leader_t *
492
+
rd_kafka_mock_partition_next_leader_response(rd_kafka_mock_partition_t *mpart) {
493
+
return TAILQ_FIRST(&mpart->leader_responses);
494
+
}
495
+
496
+
/**
497
+
* @brief Unlink and destroy a partition leader response
498
+
*/
499
+
void rd_kafka_mock_partition_leader_destroy(
500
+
rd_kafka_mock_partition_t *mpart,
501
+
rd_kafka_mock_partition_leader_t *mpart_leader) {
502
+
TAILQ_REMOVE(&mpart->leader_responses, mpart_leader, link);
503
+
rd_free(mpart_leader);
504
+
}
473
505
474
506
/**
475
507
* @brief Unlink and destroy committed offset
@@ -546,13 +578,18 @@ rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,
546
578
static void rd_kafka_mock_partition_destroy(rd_kafka_mock_partition_t *mpart) {
547
579
rd_kafka_mock_msgset_t *mset, *tmp;
548
580
rd_kafka_mock_committed_offset_t *coff, *tmpcoff;
581
+
rd_kafka_mock_partition_leader_t *mpart_leader, *tmp_mpart_leader;
549
582
550
583
TAILQ_FOREACH_SAFE(mset, &mpart->msgsets, link, tmp)
551
584
rd_kafka_mock_msgset_destroy(mpart, mset);
552
585
553
586
TAILQ_FOREACH_SAFE(coff, &mpart->committed_offsets, link, tmpcoff)
554
587
rd_kafka_mock_committed_offset_destroy(mpart, coff);
555
588
589
+
TAILQ_FOREACH_SAFE(mpart_leader, &mpart->leader_responses, link,
590
+
tmp_mpart_leader)
591
+
rd_kafka_mock_partition_leader_destroy(mpart, mpart_leader);
592
+
556
593
rd_list_destroy(&mpart->pidstates);
557
594
558
595
rd_free(mpart->replicas);
@@ -579,6 +616,7 @@ static void rd_kafka_mock_partition_init(rd_kafka_mock_topic_t *mtopic,
579
616
mpart->update_follower_end_offset = rd_true;
580
617
581
618
TAILQ_INIT(&mpart->committed_offsets);
619
+
TAILQ_INIT(&mpart->leader_responses);
582
620
583
621
rd_list_init(&mpart->pidstates, 0, rd_free);
584
622
@@ -2096,6 +2134,23 @@ rd_kafka_mock_partition_set_follower_wmarks(rd_kafka_mock_cluster_t *mcluster,
2096
2134
rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
2097
2135
}
2098
2136
2137
+
rd_kafka_resp_err_t
2138
+
rd_kafka_mock_partition_push_leader_response(rd_kafka_mock_cluster_t *mcluster,
2139
+
const char *topic,
2140
+
int partition,
2141
+
int32_t leader_id,
2142
+
int32_t leader_epoch) {
2143
+
rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK);
2144
+
rko->rko_u.mock.name = rd_strdup(topic);
2145
+
rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_PART_PUSH_LEADER_RESPONSE;
2146
+
rko->rko_u.mock.partition = partition;
2147
+
rko->rko_u.mock.leader_id = leader_id;
2148
+
rko->rko_u.mock.leader_epoch = leader_epoch;
2149
+
2150
+
return rd_kafka_op_err_destroy(
2151
+
rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
2152
+
}
2153
+
2099
2154
rd_kafka_resp_err_t
2100
2155
rd_kafka_mock_broker_set_down(rd_kafka_mock_cluster_t *mcluster,
2101
2156
int32_t broker_id) {
@@ -2379,6 +2434,23 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster,
2379
2434
mpart->update_follower_end_offset = rd_false;
2380
2435
}
2381
2436
break;
2437
+
case RD_KAFKA_MOCK_CMD_PART_PUSH_LEADER_RESPONSE:
2438
+
mpart = rd_kafka_mock_partition_get(
2439
+
mcluster, rko->rko_u.mock.name, rko->rko_u.mock.partition);
2440
+
if (!mpart)
2441
+
return RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
2442
+
2443
+
rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
2444
+
"Push %s [%" PRId32 "] leader response: (%" PRId32
2445
+
", %" PRId32 ")",
2446
+
rko->rko_u.mock.name, rko->rko_u.mock.partition,
2447
+
rko->rko_u.mock.leader_id,
2448
+
rko->rko_u.mock.leader_epoch);
2449
+
2450
+
rd_kafka_mock_partition_push_leader_response0(
2451
+
mpart, rko->rko_u.mock.leader_id,
2452
+
rko->rko_u.mock.leader_epoch);
2453
+
break;
2382
2454
2383
2455
/* Broker commands */
2384
2456
case RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN:
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4