@@ -27,11 +27,14 @@ import (
27
27
)
28
28
29
29
const (
30
-
TEST_GROUP_ID = "test_group_id"
30
+
TEST_GROUP_ID = "test_group_id"
31
+
KAFKA_OFFSET = "kafkaoffset"
32
+
KAFKA_PARTITION = "kafkapartition"
33
+
KAFKA_TOPIC = "kafkatopic"
31
34
)
32
35
33
36
func TestSendStructuredMessageToStructured(t *testing.T) {
34
-
close, s, r := testSenderReceiver(t)
37
+
close, s, r, _ := testSenderReceiver(t)
35
38
defer close()
36
39
EachEvent(t, Events(), func(t *testing.T, eventIn event.Event) {
37
40
eventIn = ConvertEventExtensionsToString(t, eventIn)
@@ -46,7 +49,7 @@ func TestSendStructuredMessageToStructured(t *testing.T) {
46
49
}
47
50
48
51
func TestSendBinaryMessageToBinary(t *testing.T) {
49
-
close, s, r := testSenderReceiver(t)
52
+
close, s, r, topicName := testSenderReceiver(t)
50
53
defer close()
51
54
EachEvent(t, Events(), func(t *testing.T, eventIn event.Event) {
52
55
eventIn = ConvertEventExtensionsToString(t, eventIn)
@@ -55,7 +58,16 @@ func TestSendBinaryMessageToBinary(t *testing.T) {
55
58
test.SendReceive(t, binding.WithPreferredEventEncoding(context.TODO(), binding.EncodingBinary), in, s, r, func(out binding.Message) {
56
59
eventOut := MustToEvent(t, context.Background(), out)
57
60
assert.Equal(t, binding.EncodingBinary, out.ReadEncoding())
58
-
AssertEventEquals(t, eventIn, ConvertEventExtensionsToString(t, eventOut))
61
+
eventOut = ConvertEventExtensionsToString(t, eventOut)
62
+
63
+
require.Equal(t, topicName, eventOut.Extensions()[KAFKA_TOPIC])
64
+
require.NotNil(t, eventOut.Extensions()[KAFKA_PARTITION])
65
+
require.NotNil(t, eventOut.Extensions()[KAFKA_OFFSET])
66
+
67
+
AllOf(
68
+
HasExactlyAttributesEqualTo(eventIn.Context),
69
+
HasData(eventIn.Data()),
70
+
)
59
71
})
60
72
})
61
73
}
@@ -82,7 +94,7 @@ func testClient(t testing.TB) sarama.Client {
82
94
return client
83
95
}
84
96
85
-
func testSenderReceiver(t testing.TB) (func(), bindings.Sender, bindings.Receiver) {
97
+
func testSenderReceiver(t testing.TB) (func(), bindings.Sender, bindings.Receiver, string) {
86
98
client := testClient(t)
87
99
88
100
topicName := "test-ce-client-" + uuid.New().String()
@@ -97,11 +109,11 @@ func testSenderReceiver(t testing.TB) (func(), bindings.Sender, bindings.Receive
97
109
return func() {
98
110
require.NoError(t, p.Close(context.TODO()))
99
111
require.NoError(t, client.Close())
100
-
}, p, p
112
+
}, p, p, topicName
101
113
}
102
114
103
115
func BenchmarkSendReceive(b *testing.B) {
104
-
c, s, r := testSenderReceiver(b)
116
+
c, s, r, _ := testSenderReceiver(b)
105
117
defer c() // Cleanup
106
118
test.BenchmarkSendReceive(b, s, r)
107
119
}
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4