A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://github.com/SahilKang/cl-rdkafka/commit/267e18399351ec5b6a282c9dbd9b194145ff454b below:

Merge pull request #68 from SahilKang/assign · SahilKang/cl-rdkafka@267e183 · GitHub

File tree Expand file treeCollapse file tree 9 files changed

+100

-17

lines changed

Filter options

Expand file treeCollapse file tree 9 files changed

+100

-17

lines changed Original file line number Diff line number Diff line change

@@ -1,5 +1,13 @@

1 1

# Changelog

2 2 3 +

## [1.2.0] - 2022-12-30

4 + 5 +

### Added

6 + 7 +

* Add support for offsets to `assign` and `assignment` consumer methods.

8 +

[issue #67](https://github.com/SahilKang/cl-rdkafka/issues/67) from

9 +

[@Uthar](https://github.com/Uthar).

10 + 3 11

## [1.1.0] - 2020-07-04

4 12 5 13

### Added

Original file line number Diff line number Diff line change

@@ -1,4 +1,5 @@

1 1

Copyright (C) 2018-2020 Sahil Kang <sahil.kang@asilaycomputing.com>

2 +

Copyright 2022 Google LLC

2 3 3 4

All parts of cl-rdkafka are licensed under the GNU GPLv3 as detailed below.

4 5 Original file line number Diff line number Diff line change

@@ -156,6 +156,15 @@ with any questions :incoming_envelope:

156 156 157 157

To run the tests:

158 158 159 +

:warning: Some of the following commands below, such as `--rmi` and `prune`,

160 +

will remove all local docker images and volumes. If this may be a problem,

161 +

consult the

162 +

[docker compose](https://docs.docker.com/engine/reference/commandline/compose_down/),

163 +

[docker system](https://docs.docker.com/engine/reference/commandline/system_prune/),

164 +

and

165 +

[docker volume](https://docs.docker.com/engine/reference/commandline/volume_prune/)

166 +

docs.

167 + 159 168

```bash

160 169

$ docker-compose -f ./test/docker-compose.test.yml \

161 170

> up --build --remove-orphans --abort-on-container-exit test

@@ -626,17 +635,23 @@ Return a list of topic names that `consumer` is subscribed to.

626 635 627 636

Assign `partitions` to `consumer`.

628 637 629 -

`partitions` should be a sequence of `(topic . partition)` cons cells.

638 +

`partitions` should be a sequence of either:

639 +

* `(topic . partition)` cons cells

640 +

* `((topic . partition) . offset)` cons cells

630 641 631 642

---

632 643 633 644

### assignment

634 645 635 646

```lisp

636 -

((consumer consumer))

647 +

((consumer consumer) &key offsetsp)

637 648

```

638 649 639 -

Return a `(topic . partition)` list of partitions assigned to `consumer`.

650 +

Return a list of partitions assigned to `consumer`.

651 + 652 +

The elements of the returned list will be either:

653 +

* `(topic . partition)` cons cells if `offsetsp` is nil

654 +

* `((topic . partition) . offset)` cons cells otherwise

640 655 641 656

---

642 657 Original file line number Diff line number Diff line change

@@ -6,7 +6,7 @@ services:

6 6

- "2181:2181"

7 7 8 8

kafka:

9 -

image: wurstmeister/kafka

9 +

image: wurstmeister/kafka:2.12-2.4.0

10 10

ports:

11 11

- "9092:9092"

12 12

environment:

Original file line number Diff line number Diff line change

@@ -1,4 +1,5 @@

1 1

;;; Copyright (C) 2018-2020 Sahil Kang <sahil.kang@asilaycomputing.com>

2 +

;;; Copyright 2022 Google LLC

2 3

;;;

3 4

;;; This file is part of cl-rdkafka.

4 5

;;;

@@ -88,7 +89,7 @@ Example:

88 89 89 90

(defgeneric committed (consumer partitions timeout-ms))

90 91 91 -

(defgeneric assignment (consumer))

92 +

(defgeneric assignment (consumer &key offsetsp))

92 93 93 94

(defgeneric assign (consumer partitions))

94 95

@@ -342,13 +343,20 @@ If ASYNCP is true, then a FUTURE will be returned instead."

342 343

(error (make-rdkafka-error err)))

343 344

(cffi:mem-ref rd-list :pointer))))

344 345 345 -

(defmethod assignment ((consumer consumer))

346 -

"Return a (topic . partition) list of partitions assigned to CONSUMER."

346 +

(defmethod assignment ((consumer consumer) &key offsetsp)

347 +

"Return a list of partitions assigned to CONSUMER.

348 + 349 +

The elements of the returned list will be either:

350 +

* (topic . partition) cons cells if OFFSETSP is nil

351 +

* ((topic . partition) . offset) cons cells otherwise"

347 352

(with-slots (rd-kafka-consumer) consumer

348 353

(with-toppar-list toppar-list (%assignment rd-kafka-consumer)

349 354

(let (partitions)

350 -

(foreach-toppar toppar-list (topic partition)

351 -

(push (cons topic partition) partitions))

355 +

(if offsetsp

356 +

(foreach-toppar toppar-list (topic partition offset)

357 +

(push (cons (cons topic partition) offset) partitions))

358 +

(foreach-toppar toppar-list (topic partition)

359 +

(push (cons topic partition) partitions)))

352 360

(nreverse partitions)))))

353 361 354 362

(defmethod committed

@@ -396,11 +404,29 @@ The PARTIAL-ERROR will have the slots:

396 404

(defmethod assign ((consumer consumer) (partitions sequence))

397 405

"Assign PARTITIONS to CONSUMER.

398 406 399 -

PARTITIONS should be a sequence of (topic . partition) cons cells."

407 +

PARTITIONS should be a sequence of either:

408 +

* (topic . partition) cons cells

409 +

* ((topic . partition) . offset) cons cells"

400 410

(with-slots (rd-kafka-consumer) consumer

401 411

(with-toppar-list

402 412

toppar-list

403 -

(alloc-toppar-list partitions :topic #'car :partition #'cdr)

413 +

(alloc-toppar-list

414 +

partitions

415 +

:topic (lambda (cons)

416 +

(let ((car (car cons)))

417 +

(if (consp car)

418 +

(car car)

419 +

car)))

420 +

:partition (lambda (cons)

421 +

(let ((car (car cons)))

422 +

(if (consp car)

423 +

(cdr car)

424 +

(cdr cons))))

425 +

:offset (lambda (cons)

426 +

(let ((car (car cons)))

427 +

(if (consp car)

428 +

(cdr cons)

429 +

cl-rdkafka/ll:rd-kafka-offset-invalid))))

404 430

(let ((err (cl-rdkafka/ll:rd-kafka-assign rd-kafka-consumer toppar-list)))

405 431

(unless (eq err 'cl-rdkafka/ll:rd-kafka-resp-err-no-error)

406 432

(error (make-rdkafka-error err)))))))

Original file line number Diff line number Diff line change

@@ -6,7 +6,7 @@ services:

6 6

- "2181:2181"

7 7 8 8

kafka:

9 -

image: wurstmeister/kafka

9 +

image: wurstmeister/kafka:2.12-2.4.0

10 10

ports:

11 11

- "9092:9092"

12 12

environment:

@@ -22,6 +22,6 @@ services:

22 22

dockerfile: ./test/Dockerfile.test

23 23

args:

24 24

bootstrap_servers: "kafka:9092"

25 -

librdkafka_commit: "v1.4.2"

25 +

librdkafka_commit: "v1.6.0"

26 26

links:

27 27

- kafka

Original file line number Diff line number Diff line change

@@ -1,4 +1,5 @@

1 1

;;; Copyright (C) 2018-2020 Sahil Kang <sahil.kang@asilaycomputing.com>

2 +

;;; Copyright 2022 Google LLC

2 3

;;;

3 4

;;; This file is part of cl-rdkafka.

4 5

;;;

@@ -181,6 +182,37 @@

181 182

(is (string= topic actual-topic))

182 183

(is (= partition actual-partition))))))

183 184 185 +

(test assign-offsets

186 +

(with-topics ((topic "consumer-assign-offsets-topic"))

187 +

(let ((consumer (make-instance

188 +

'kf:consumer

189 +

:conf (list "bootstrap.servers" *bootstrap-servers*

190 +

"group.id" "consumer-assign-offsets-group"

191 +

"enable.auto.commit" "true"

192 +

"auto.offset.reset" "earliest")))

193 +

(partition-1 35)

194 +

(partition-2 36)

195 +

(offset 3))

196 +

(kf:assign consumer (list (cons topic partition-1)

197 +

(cons (cons topic partition-2) offset)))

198 +

(destructuring-bind

199 +

(((actual-topic-1 . actual-partition-1) . actual-offset-1)

200 +

((actual-topic-2 . actual-partition-2) . actual-offset-2))

201 +

(kf:assignment consumer :offsetsp t)

202 +

(is (string= topic actual-topic-1))

203 +

(is (string= topic actual-topic-2))

204 +

(is (or (= partition-1 actual-partition-1)

205 +

(= partition-2 actual-partition-1)))

206 +

(if (= partition-1 actual-partition-1)

207 +

(progn

208 +

(is (= partition-2 actual-partition-2))

209 +

(is (= cl-rdkafka/ll:rd-kafka-offset-stored actual-offset-1))

210 +

(is (= offset actual-offset-2)))

211 +

(progn

212 +

(is (= partition-1 actual-partition-2))

213 +

(is (= offset actual-offset-1))

214 +

(is (= cl-rdkafka/ll:rd-kafka-offset-stored actual-offset-2))))))))

215 + 184 216

(test consumer-member-id

185 217

(with-topics ((topic "consumer-member-id"))

186 218

(let* ((group "consumer-member-id-group")

Original file line number Diff line number Diff line change

@@ -1,4 +1,5 @@

1 1

;;; Copyright (C) 2018-2020 Sahil Kang <sahil.kang@asilaycomputing.com>

2 +

;;; Copyright 2022 Google LLC

2 3

;;;

3 4

;;; This file is part of cl-rdkafka.

4 5

;;;

@@ -103,14 +104,14 @@

103 104

do (kf:send producer topic "deadlock-message")

104 105

finally (kf:flush producer))

105 106

(loop

106 -

repeat (* 2 expected-messages)

107 +

repeat (* 20 expected-messages)

107 108

until (= expected-messages actual-messages)

108 109

do (poll))))

109 110

(unwind-protect

110 -

;; pass or fail within 1 minute

111 +

;; pass or fail within 2 minutes

111 112

(loop

112 113

initially (lparallel:submit-task channel #'poll-loop)

113 -

repeat 30

114 +

repeat 60

114 115

until (= expected-messages actual-messages)

115 116

do (sleep 2)

116 117

finally (is (= expected-messages actual-messages)))

Original file line number Diff line number Diff line change

@@ -1 +1 @@

1 -

"1.1.0"

1 +

"1.2.0"

You can’t perform that action at this time.


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4