A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://github.com/SahilKang/cl-rdkafka/commit/4c05c1aaf59648161f2717b944f3c36b42472195 below:

Merge pull request #60 from SahilKang/deadlock · SahilKang/cl-rdkafka@4c05c1a · GitHub

File tree Expand file treeCollapse file tree 6 files changed

+89

-27

lines changed

Filter options

Expand file treeCollapse file tree 6 files changed

+89

-27

lines changed Original file line number Diff line number Diff line change

@@ -1,5 +1,13 @@

1 1

# Changelog

2 2 3 +

## [1.0.2] - 2020-04-20

4 + 5 +

### Fixed

6 + 7 +

* Fix deadlock bug described in

8 +

[issue #58](https://github.com/SahilKang/cl-rdkafka/issues/58) from

9 +

[@kat-co](https://github.com/kat-co).

10 + 3 11

## [1.0.1] - 2020-01-14

4 12 5 13

### Fixed

Original file line number Diff line number Diff line change

@@ -247,17 +247,18 @@ STORE-FUNCTION restart will be provided if it's a serde condition."

247 247

(cl-rdkafka/ll:rd-kafka-message-destroy rd-kafka-message))))))

248 248 249 249

(defun %commit (rd-kafka-consumer toppar-list rd-kafka-queue)

250 -

(let ((err (cl-rdkafka/ll:rd-kafka-commit-queue

251 -

rd-kafka-consumer

252 -

toppar-list

253 -

rd-kafka-queue

254 -

(cffi:null-pointer)

255 -

(cffi:null-pointer))))

256 -

(unless (eq err 'cl-rdkafka/ll:rd-kafka-resp-err-no-error)

257 -

(error (make-rdkafka-error err)))

258 -

(let ((promise (lparallel:promise)))

259 -

(enqueue-payload rd-kafka-queue promise)

260 -

promise)))

250 +

(bt:with-lock-held (+address->queue-lock+)

251 +

(let ((err (cl-rdkafka/ll:rd-kafka-commit-queue

252 +

rd-kafka-consumer

253 +

toppar-list

254 +

rd-kafka-queue

255 +

(cffi:null-pointer)

256 +

(cffi:null-pointer))))

257 +

(unless (eq err 'cl-rdkafka/ll:rd-kafka-resp-err-no-error)

258 +

(error (make-rdkafka-error err)))

259 +

(let ((promise (lparallel:promise)))

260 +

(enqueue-payload rd-kafka-queue promise)

261 +

promise))))

261 262 262 263

(defmethod commit ((consumer consumer) &key offsets asyncp)

263 264

"Commit OFFSETS to broker.

Original file line number Diff line number Diff line change

@@ -136,10 +136,9 @@

136 136

(error c))))

137 137 138 138

(defun enqueue-payload (rd-kafka-queue payload)

139 -

(bt:with-lock-held (+address->queue-lock+)

140 -

(let* ((address (cffi:pointer-address rd-kafka-queue))

141 -

(queue (cdr (gethash address +address->queue+))))

142 -

(lparallel.queue:push-queue payload queue))))

139 +

(let* ((address (cffi:pointer-address rd-kafka-queue))

140 +

(queue (cdr (gethash address +address->queue+))))

141 +

(lparallel.queue:push-queue payload queue)))

143 142 144 143

(defun deregister-rd-kafka-queue (rd-kafka-queue)

145 144

(let ((address (cffi:pointer-address rd-kafka-queue)))

Original file line number Diff line number Diff line change

@@ -243,17 +243,18 @@ STORE-FUNCTION restart will be provided if it's a serde condition."

243 243

(let ((key-bytes (if key-p (apply-serde key-serde key) (vector)))

244 244

(value-bytes (apply-serde value-serde value))

245 245

(partition (or partition cl-rdkafka/ll:rd-kafka-partition-ua)))

246 -

(%send rd-kafka-producer

247 -

topic

248 -

partition

249 -

key-bytes

250 -

value-bytes

251 -

headers

252 -

(or timestamp 0))

253 -

(let ((promise (lparallel:promise)))

254 -

(enqueue-payload rd-kafka-queue (list promise key value))

255 -

(setf last-promise promise)

256 -

(make-instance 'future :promise promise :client producer)))))

246 +

(bt:with-lock-held (+address->queue-lock+)

247 +

(%send rd-kafka-producer

248 +

topic

249 +

partition

250 +

key-bytes

251 +

value-bytes

252 +

headers

253 +

(or timestamp 0))

254 +

(let ((promise (lparallel:promise)))

255 +

(enqueue-payload rd-kafka-queue (list promise key value))

256 +

(setf last-promise promise)

257 +

(make-instance 'future :promise promise :client producer))))))

257 258 258 259

;; using rd_kafka_flush with rd_kafka_event_dr would cause a sporadic

259 260

;; NULL dereference for some reason. My gut feeling is that some race

Original file line number Diff line number Diff line change

@@ -65,3 +65,56 @@

65 65

(let ((expected (produce-messages topic))

66 66

(actual (consume-messages topic)))

67 67

(is (equal expected actual)))))

68 + 69 + 70 +

(test deadlock

71 +

(with-topics ((topic "deadlock-topic"))

72 +

(let* ((expected-messages 100000)

73 +

(actual-messages 0)

74 +

(lock (bt:make-lock "deadlock"))

75 +

(kernel (lparallel:make-kernel 5 :name "deadlock"))

76 +

(channel (let ((lparallel:*kernel* kernel))

77 +

(lparallel:make-channel)))

78 +

(producer (make-instance

79 +

'kf:producer

80 +

:conf (list "bootstrap.servers" *bootstrap-servers*)

81 +

:serde #'babel:string-to-octets))

82 +

(consumer (make-instance

83 +

'kf:consumer

84 +

:conf (list "bootstrap.servers" *bootstrap-servers*

85 +

"group.id" "deadlock-group-id"

86 +

"enable.auto.commit" "false"

87 +

"auto.offset.reset" "earliest"

88 +

"offset.store.method" "broker"

89 +

"enable.partition.eof" "false")

90 +

:serde #'babel:octets-to-string)))

91 +

(labels ((process ()

92 +

(bt:with-lock-held (lock)

93 +

(incf actual-messages)

94 +

(kf:commit consumer :asyncp t)))

95 +

(poll ()

96 +

(let (message)

97 +

(bt:with-lock-held (lock)

98 +

(setf message (kf:poll consumer 5)))

99 +

(when message

100 +

(lparallel:submit-task channel #'process))))

101 +

(poll-loop ()

102 +

(loop

103 +

initially (kf:subscribe consumer topic)

104 +

repeat expected-messages

105 +

do (kf:send producer topic "deadlock-message")

106 +

finally (kf:flush producer))

107 +

(loop

108 +

repeat (* 2 expected-messages)

109 +

until (= expected-messages actual-messages)

110 +

do (poll))))

111 +

(unwind-protect

112 +

;; pass or fail within 1 minute

113 +

(loop

114 +

initially (lparallel:submit-task channel #'poll-loop)

115 +

repeat 30

116 +

until (= expected-messages actual-messages)

117 +

do (sleep 2)

118 +

finally (is (= expected-messages actual-messages)))

119 +

(let ((lparallel:*kernel* kernel))

120 +

(lparallel:end-kernel)))))))

Original file line number Diff line number Diff line change

@@ -1 +1 @@

1 -

"1.0.1"

1 +

"1.0.2"

You can’t perform that action at this time.


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4