+10
-2
lines changedFilter options
+10
-2
lines changed Original file line number Diff line number Diff line change
@@ -66,15 +66,23 @@ func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
66
66
return nil
67
67
}
68
68
m := NewMessageFromConsumerMessage(msg)
69
-
70
-
r.incoming <- msgErr{
69
+
msgErrObj := msgErr{
71
70
msg: binding.WithFinish(m, func(err error) {
72
71
if protocol.IsACK(err) {
73
72
session.MarkMessage(msg, "")
74
73
}
75
74
}),
76
75
}
77
76
77
+
// Need to use select clause here, otherwise r.incoming <- msgErrObj can become a blocking operation,
78
+
// resulting in never reaching outside block's case <-session.Context().Done()
79
+
select {
80
+
case r.incoming <- msgErrObj:
81
+
// do nothing
82
+
case <-session.Context().Done():
83
+
return nil
84
+
}
85
+
78
86
// Should return when `session.Context()` is done.
79
87
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
80
88
// https://github.com/Shopify/sarama/issues/1192
You can’t perform that action at this time.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4