A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://github.com/cloudevents/sdk-go/commit/6aa2742197a8484572f83206ac6749826706ca7e below:

context.Done() may never reach if waiting on r.incoming <- msgErr · cloudevents/sdk-go@6aa2742 · GitHub

File tree Expand file treeCollapse file tree 1 file changed

+10

-2

lines changed

Filter options

Expand file treeCollapse file tree 1 file changed

+10

-2

lines changed Original file line number Diff line number Diff line change

@@ -66,15 +66,23 @@ func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram

66 66

return nil

67 67

}

68 68

m := NewMessageFromConsumerMessage(msg)

69 - 70 -

r.incoming <- msgErr{

69 +

msgErrObj := msgErr{

71 70

msg: binding.WithFinish(m, func(err error) {

72 71

if protocol.IsACK(err) {

73 72

session.MarkMessage(msg, "")

74 73

}

75 74

}),

76 75

}

77 76 77 +

// Need to use select clause here, otherwise r.incoming <- msgErrObj can become a blocking operation,

78 +

// resulting in never reaching outside block's case <-session.Context().Done()

79 +

select {

80 +

case r.incoming <- msgErrObj:

81 +

// do nothing

82 +

case <-session.Context().Done():

83 +

return nil

84 +

}

85 + 78 86

// Should return when `session.Context()` is done.

79 87

// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:

80 88

// https://github.com/Shopify/sarama/issues/1192

You can’t perform that action at this time.


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4