+17
-13
lines changedFilter options
+17
-13
lines changed Original file line number Diff line number Diff line change
@@ -10,7 +10,6 @@ import (
10
10
"os"
11
11
"strings"
12
12
"testing"
13
-
"time"
14
13
15
14
"github.com/IBM/sarama"
16
15
"github.com/google/uuid"
@@ -91,25 +90,30 @@ func testClient(t testing.TB) sarama.Client {
91
90
}
92
91
93
92
func testSenderReceiver(t testing.TB) (func(), bindings.Sender, bindings.Receiver, string) {
94
-
client := testClient(t)
95
-
96
93
topicName := "test-ce-client-" + uuid.New().String()
97
-
p, err := kafka_sarama.NewProtocolFromClient(client, topicName, topicName, kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID))
94
+
95
+
// Create a 'client' and 'protocol" for the Receiver side
96
+
clientR := testClient(t)
97
+
protocolR, err := kafka_sarama.NewProtocolFromClient(clientR, topicName, topicName, kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID))
98
98
require.NoError(t, err)
99
-
require.NotNil(t, p)
99
+
require.NotNil(t, protocolR)
100
+
101
+
// Create a 'client' and 'protocol" for the Sender side
102
+
clientS := testClient(t)
103
+
protocolS, err := kafka_sarama.NewProtocolFromClient(clientS, topicName, topicName, kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID))
104
+
require.NoError(t, err)
105
+
require.NotNil(t, protocolS)
100
106
101
107
go func() {
102
-
require.NoError(t, p.OpenInbound(context.TODO()))
108
+
require.NoError(t, protocolR.OpenInbound(context.TODO()))
103
109
}()
104
110
105
-
// Not perfect but we need to give OpenInbound() as chance to start
106
-
// as it's a race condition. I couldn't find something on 'p' to wait for
107
-
time.Sleep(6 * time.Second)
108
-
109
111
return func() {
110
-
require.NoError(t, p.Close(context.TODO()))
111
-
require.NoError(t, client.Close())
112
-
}, p, p, topicName
112
+
require.NoError(t, protocolR.Close(context.TODO()))
113
+
require.NoError(t, protocolS.Close(context.TODO()))
114
+
require.NoError(t, clientR.Close())
115
+
require.NoError(t, clientS.Close())
116
+
}, protocolS, protocolR, topicName
113
117
}
114
118
115
119
func BenchmarkSendReceive(b *testing.B) {
You can’t perform that action at this time.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4