Need to get messages from Pulsar and acknowledge them, so that Pulsar can keep track of your progress? Then a consumer is what we want.
Start with creating a client.
When creating a consumer we have these options:
Only the subscription name and topic are required.
Create a consumer using the buildervar consumer = client.NewConsumer() .SubscriptionName("MySubscription") .Topic("persistent://public/default/mytopic") .Create();Create a consumer without the builder
var options = new ConsumerOptions("MySubscription", "persistent://public/default/mytopic"); var consumer = client.CreateConsumer(options);
We can either 'Receive' a single message or have an IAsyncEnumerable for 'Messages'.
If we just want a single message.
var message = await consumer.Receive(); Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));Receive messages as an IAsyncEnumerable
There's an extension method in "DotPulsar.Extensions" for reading messages as an IAsyncEnumerable.
await foreach (var message in consumer.Messages()) { Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray())); }
The message has these properties:
We have two options when acknowledging messages.
To just acknowledge a single message:
await consumer.Acknowledge(message);Cumulative acknowledgment
To acknowledge all messages up to and include then provided message:
await consumer.AcknowledgeCumulative(message);
Please do note: We can't do this if the subscription is shared!
Need to reset the cursor? What's easy:
await consumer.Seek(MessageId.Earliest); // Provide the message-id to reset to.
Please do note: When seeking we will shortly be disconnected, as that's how Pulsar handles seek requests.
As easy as:
await consumer.Unsubscribe();
Please do note: We can't use the consumer after unsubscribing and therefore should dispose it.
Monitoring consumer stateMonitoring the state is recommended and described here.
The Consumer implements IAsyncDisposable and should be disposed when it's no longer needed. Disposing the client will dispose all consumers.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4