+100
-17
lines changedFilter options
+100
-17
lines changed Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
1
1
# Changelog
2
2
3
+
## [1.2.0] - 2022-12-30
4
+
5
+
### Added
6
+
7
+
* Add support for offsets to `assign` and `assignment` consumer methods.
8
+
[issue #67](https://github.com/SahilKang/cl-rdkafka/issues/67) from
9
+
[@Uthar](https://github.com/Uthar).
10
+
3
11
## [1.1.0] - 2020-07-04
4
12
5
13
### Added
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
1
1
Copyright (C) 2018-2020 Sahil Kang <sahil.kang@asilaycomputing.com>
2
+
Copyright 2022 Google LLC
2
3
3
4
All parts of cl-rdkafka are licensed under the GNU GPLv3 as detailed below.
4
5
Original file line number Diff line number Diff line change
@@ -156,6 +156,15 @@ with any questions :incoming_envelope:
156
156
157
157
To run the tests:
158
158
159
+
:warning: Some of the following commands below, such as `--rmi` and `prune`,
160
+
will remove all local docker images and volumes. If this may be a problem,
161
+
consult the
162
+
[docker compose](https://docs.docker.com/engine/reference/commandline/compose_down/),
163
+
[docker system](https://docs.docker.com/engine/reference/commandline/system_prune/),
164
+
and
165
+
[docker volume](https://docs.docker.com/engine/reference/commandline/volume_prune/)
166
+
docs.
167
+
159
168
```bash
160
169
$ docker-compose -f ./test/docker-compose.test.yml \
161
170
> up --build --remove-orphans --abort-on-container-exit test
@@ -626,17 +635,23 @@ Return a list of topic names that `consumer` is subscribed to.
626
635
627
636
Assign `partitions` to `consumer`.
628
637
629
-
`partitions` should be a sequence of `(topic . partition)` cons cells.
638
+
`partitions` should be a sequence of either:
639
+
* `(topic . partition)` cons cells
640
+
* `((topic . partition) . offset)` cons cells
630
641
631
642
---
632
643
633
644
### assignment
634
645
635
646
```lisp
636
-
((consumer consumer))
647
+
((consumer consumer) &key offsetsp)
637
648
```
638
649
639
-
Return a `(topic . partition)` list of partitions assigned to `consumer`.
650
+
Return a list of partitions assigned to `consumer`.
651
+
652
+
The elements of the returned list will be either:
653
+
* `(topic . partition)` cons cells if `offsetsp` is nil
654
+
* `((topic . partition) . offset)` cons cells otherwise
640
655
641
656
---
642
657
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ services:
6
6
- "2181:2181"
7
7
8
8
kafka:
9
-
image: wurstmeister/kafka
9
+
image: wurstmeister/kafka:2.12-2.4.0
10
10
ports:
11
11
- "9092:9092"
12
12
environment:
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
1
1
;;; Copyright (C) 2018-2020 Sahil Kang <sahil.kang@asilaycomputing.com>
2
+
;;; Copyright 2022 Google LLC
2
3
;;;
3
4
;;; This file is part of cl-rdkafka.
4
5
;;;
@@ -88,7 +89,7 @@ Example:
88
89
89
90
(defgeneric committed (consumer partitions timeout-ms))
90
91
91
-
(defgeneric assignment (consumer))
92
+
(defgeneric assignment (consumer &key offsetsp))
92
93
93
94
(defgeneric assign (consumer partitions))
94
95
@@ -342,13 +343,20 @@ If ASYNCP is true, then a FUTURE will be returned instead."
342
343
(error (make-rdkafka-error err)))
343
344
(cffi:mem-ref rd-list :pointer))))
344
345
345
-
(defmethod assignment ((consumer consumer))
346
-
"Return a (topic . partition) list of partitions assigned to CONSUMER."
346
+
(defmethod assignment ((consumer consumer) &key offsetsp)
347
+
"Return a list of partitions assigned to CONSUMER.
348
+
349
+
The elements of the returned list will be either:
350
+
* (topic . partition) cons cells if OFFSETSP is nil
351
+
* ((topic . partition) . offset) cons cells otherwise"
347
352
(with-slots (rd-kafka-consumer) consumer
348
353
(with-toppar-list toppar-list (%assignment rd-kafka-consumer)
349
354
(let (partitions)
350
-
(foreach-toppar toppar-list (topic partition)
351
-
(push (cons topic partition) partitions))
355
+
(if offsetsp
356
+
(foreach-toppar toppar-list (topic partition offset)
357
+
(push (cons (cons topic partition) offset) partitions))
358
+
(foreach-toppar toppar-list (topic partition)
359
+
(push (cons topic partition) partitions)))
352
360
(nreverse partitions)))))
353
361
354
362
(defmethod committed
@@ -396,11 +404,29 @@ The PARTIAL-ERROR will have the slots:
396
404
(defmethod assign ((consumer consumer) (partitions sequence))
397
405
"Assign PARTITIONS to CONSUMER.
398
406
399
-
PARTITIONS should be a sequence of (topic . partition) cons cells."
407
+
PARTITIONS should be a sequence of either:
408
+
* (topic . partition) cons cells
409
+
* ((topic . partition) . offset) cons cells"
400
410
(with-slots (rd-kafka-consumer) consumer
401
411
(with-toppar-list
402
412
toppar-list
403
-
(alloc-toppar-list partitions :topic #'car :partition #'cdr)
413
+
(alloc-toppar-list
414
+
partitions
415
+
:topic (lambda (cons)
416
+
(let ((car (car cons)))
417
+
(if (consp car)
418
+
(car car)
419
+
car)))
420
+
:partition (lambda (cons)
421
+
(let ((car (car cons)))
422
+
(if (consp car)
423
+
(cdr car)
424
+
(cdr cons))))
425
+
:offset (lambda (cons)
426
+
(let ((car (car cons)))
427
+
(if (consp car)
428
+
(cdr cons)
429
+
cl-rdkafka/ll:rd-kafka-offset-invalid))))
404
430
(let ((err (cl-rdkafka/ll:rd-kafka-assign rd-kafka-consumer toppar-list)))
405
431
(unless (eq err 'cl-rdkafka/ll:rd-kafka-resp-err-no-error)
406
432
(error (make-rdkafka-error err)))))))
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ services:
6
6
- "2181:2181"
7
7
8
8
kafka:
9
-
image: wurstmeister/kafka
9
+
image: wurstmeister/kafka:2.12-2.4.0
10
10
ports:
11
11
- "9092:9092"
12
12
environment:
@@ -22,6 +22,6 @@ services:
22
22
dockerfile: ./test/Dockerfile.test
23
23
args:
24
24
bootstrap_servers: "kafka:9092"
25
-
librdkafka_commit: "v1.4.2"
25
+
librdkafka_commit: "v1.6.0"
26
26
links:
27
27
- kafka
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
1
1
;;; Copyright (C) 2018-2020 Sahil Kang <sahil.kang@asilaycomputing.com>
2
+
;;; Copyright 2022 Google LLC
2
3
;;;
3
4
;;; This file is part of cl-rdkafka.
4
5
;;;
@@ -181,6 +182,37 @@
181
182
(is (string= topic actual-topic))
182
183
(is (= partition actual-partition))))))
183
184
185
+
(test assign-offsets
186
+
(with-topics ((topic "consumer-assign-offsets-topic"))
187
+
(let ((consumer (make-instance
188
+
'kf:consumer
189
+
:conf (list "bootstrap.servers" *bootstrap-servers*
190
+
"group.id" "consumer-assign-offsets-group"
191
+
"enable.auto.commit" "true"
192
+
"auto.offset.reset" "earliest")))
193
+
(partition-1 35)
194
+
(partition-2 36)
195
+
(offset 3))
196
+
(kf:assign consumer (list (cons topic partition-1)
197
+
(cons (cons topic partition-2) offset)))
198
+
(destructuring-bind
199
+
(((actual-topic-1 . actual-partition-1) . actual-offset-1)
200
+
((actual-topic-2 . actual-partition-2) . actual-offset-2))
201
+
(kf:assignment consumer :offsetsp t)
202
+
(is (string= topic actual-topic-1))
203
+
(is (string= topic actual-topic-2))
204
+
(is (or (= partition-1 actual-partition-1)
205
+
(= partition-2 actual-partition-1)))
206
+
(if (= partition-1 actual-partition-1)
207
+
(progn
208
+
(is (= partition-2 actual-partition-2))
209
+
(is (= cl-rdkafka/ll:rd-kafka-offset-stored actual-offset-1))
210
+
(is (= offset actual-offset-2)))
211
+
(progn
212
+
(is (= partition-1 actual-partition-2))
213
+
(is (= offset actual-offset-1))
214
+
(is (= cl-rdkafka/ll:rd-kafka-offset-stored actual-offset-2))))))))
215
+
184
216
(test consumer-member-id
185
217
(with-topics ((topic "consumer-member-id"))
186
218
(let* ((group "consumer-member-id-group")
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
1
1
;;; Copyright (C) 2018-2020 Sahil Kang <sahil.kang@asilaycomputing.com>
2
+
;;; Copyright 2022 Google LLC
2
3
;;;
3
4
;;; This file is part of cl-rdkafka.
4
5
;;;
@@ -103,14 +104,14 @@
103
104
do (kf:send producer topic "deadlock-message")
104
105
finally (kf:flush producer))
105
106
(loop
106
-
repeat (* 2 expected-messages)
107
+
repeat (* 20 expected-messages)
107
108
until (= expected-messages actual-messages)
108
109
do (poll))))
109
110
(unwind-protect
110
-
;; pass or fail within 1 minute
111
+
;; pass or fail within 2 minutes
111
112
(loop
112
113
initially (lparallel:submit-task channel #'poll-loop)
113
-
repeat 30
114
+
repeat 60
114
115
until (= expected-messages actual-messages)
115
116
do (sleep 2)
116
117
finally (is (= expected-messages actual-messages)))
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1
-
"1.1.0"
1
+
"1.2.0"
You can’t perform that action at this time.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4