+89
-27
lines changedFilter options
+89
-27
lines changed Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
1
1
# Changelog
2
2
3
+
## [1.0.2] - 2020-04-20
4
+
5
+
### Fixed
6
+
7
+
* Fix deadlock bug described in
8
+
[issue #58](https://github.com/SahilKang/cl-rdkafka/issues/58) from
9
+
[@kat-co](https://github.com/kat-co).
10
+
3
11
## [1.0.1] - 2020-01-14
4
12
5
13
### Fixed
Original file line number Diff line number Diff line change
@@ -247,17 +247,18 @@ STORE-FUNCTION restart will be provided if it's a serde condition."
247
247
(cl-rdkafka/ll:rd-kafka-message-destroy rd-kafka-message))))))
248
248
249
249
(defun %commit (rd-kafka-consumer toppar-list rd-kafka-queue)
250
-
(let ((err (cl-rdkafka/ll:rd-kafka-commit-queue
251
-
rd-kafka-consumer
252
-
toppar-list
253
-
rd-kafka-queue
254
-
(cffi:null-pointer)
255
-
(cffi:null-pointer))))
256
-
(unless (eq err 'cl-rdkafka/ll:rd-kafka-resp-err-no-error)
257
-
(error (make-rdkafka-error err)))
258
-
(let ((promise (lparallel:promise)))
259
-
(enqueue-payload rd-kafka-queue promise)
260
-
promise)))
250
+
(bt:with-lock-held (+address->queue-lock+)
251
+
(let ((err (cl-rdkafka/ll:rd-kafka-commit-queue
252
+
rd-kafka-consumer
253
+
toppar-list
254
+
rd-kafka-queue
255
+
(cffi:null-pointer)
256
+
(cffi:null-pointer))))
257
+
(unless (eq err 'cl-rdkafka/ll:rd-kafka-resp-err-no-error)
258
+
(error (make-rdkafka-error err)))
259
+
(let ((promise (lparallel:promise)))
260
+
(enqueue-payload rd-kafka-queue promise)
261
+
promise))))
261
262
262
263
(defmethod commit ((consumer consumer) &key offsets asyncp)
263
264
"Commit OFFSETS to broker.
Original file line number Diff line number Diff line change
@@ -136,10 +136,9 @@
136
136
(error c))))
137
137
138
138
(defun enqueue-payload (rd-kafka-queue payload)
139
-
(bt:with-lock-held (+address->queue-lock+)
140
-
(let* ((address (cffi:pointer-address rd-kafka-queue))
141
-
(queue (cdr (gethash address +address->queue+))))
142
-
(lparallel.queue:push-queue payload queue))))
139
+
(let* ((address (cffi:pointer-address rd-kafka-queue))
140
+
(queue (cdr (gethash address +address->queue+))))
141
+
(lparallel.queue:push-queue payload queue)))
143
142
144
143
(defun deregister-rd-kafka-queue (rd-kafka-queue)
145
144
(let ((address (cffi:pointer-address rd-kafka-queue)))
Original file line number Diff line number Diff line change
@@ -243,17 +243,18 @@ STORE-FUNCTION restart will be provided if it's a serde condition."
243
243
(let ((key-bytes (if key-p (apply-serde key-serde key) (vector)))
244
244
(value-bytes (apply-serde value-serde value))
245
245
(partition (or partition cl-rdkafka/ll:rd-kafka-partition-ua)))
246
-
(%send rd-kafka-producer
247
-
topic
248
-
partition
249
-
key-bytes
250
-
value-bytes
251
-
headers
252
-
(or timestamp 0))
253
-
(let ((promise (lparallel:promise)))
254
-
(enqueue-payload rd-kafka-queue (list promise key value))
255
-
(setf last-promise promise)
256
-
(make-instance 'future :promise promise :client producer)))))
246
+
(bt:with-lock-held (+address->queue-lock+)
247
+
(%send rd-kafka-producer
248
+
topic
249
+
partition
250
+
key-bytes
251
+
value-bytes
252
+
headers
253
+
(or timestamp 0))
254
+
(let ((promise (lparallel:promise)))
255
+
(enqueue-payload rd-kafka-queue (list promise key value))
256
+
(setf last-promise promise)
257
+
(make-instance 'future :promise promise :client producer))))))
257
258
258
259
;; using rd_kafka_flush with rd_kafka_event_dr would cause a sporadic
259
260
;; NULL dereference for some reason. My gut feeling is that some race
Original file line number Diff line number Diff line change
@@ -65,3 +65,56 @@
65
65
(let ((expected (produce-messages topic))
66
66
(actual (consume-messages topic)))
67
67
(is (equal expected actual)))))
68
+
69
+
70
+
(test deadlock
71
+
(with-topics ((topic "deadlock-topic"))
72
+
(let* ((expected-messages 100000)
73
+
(actual-messages 0)
74
+
(lock (bt:make-lock "deadlock"))
75
+
(kernel (lparallel:make-kernel 5 :name "deadlock"))
76
+
(channel (let ((lparallel:*kernel* kernel))
77
+
(lparallel:make-channel)))
78
+
(producer (make-instance
79
+
'kf:producer
80
+
:conf (list "bootstrap.servers" *bootstrap-servers*)
81
+
:serde #'babel:string-to-octets))
82
+
(consumer (make-instance
83
+
'kf:consumer
84
+
:conf (list "bootstrap.servers" *bootstrap-servers*
85
+
"group.id" "deadlock-group-id"
86
+
"enable.auto.commit" "false"
87
+
"auto.offset.reset" "earliest"
88
+
"offset.store.method" "broker"
89
+
"enable.partition.eof" "false")
90
+
:serde #'babel:octets-to-string)))
91
+
(labels ((process ()
92
+
(bt:with-lock-held (lock)
93
+
(incf actual-messages)
94
+
(kf:commit consumer :asyncp t)))
95
+
(poll ()
96
+
(let (message)
97
+
(bt:with-lock-held (lock)
98
+
(setf message (kf:poll consumer 5)))
99
+
(when message
100
+
(lparallel:submit-task channel #'process))))
101
+
(poll-loop ()
102
+
(loop
103
+
initially (kf:subscribe consumer topic)
104
+
repeat expected-messages
105
+
do (kf:send producer topic "deadlock-message")
106
+
finally (kf:flush producer))
107
+
(loop
108
+
repeat (* 2 expected-messages)
109
+
until (= expected-messages actual-messages)
110
+
do (poll))))
111
+
(unwind-protect
112
+
;; pass or fail within 1 minute
113
+
(loop
114
+
initially (lparallel:submit-task channel #'poll-loop)
115
+
repeat 30
116
+
until (= expected-messages actual-messages)
117
+
do (sleep 2)
118
+
finally (is (= expected-messages actual-messages)))
119
+
(let ((lparallel:*kernel* kernel))
120
+
(lparallel:end-kernel)))))))
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1
-
"1.0.1"
1
+
"1.0.2"
You can’t perform that action at this time.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4