A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://github.com/apache/pulsar-dotpulsar/wiki/Consumer below:

Consumer · apache/pulsar-dotpulsar Wiki · GitHub

Need to get messages from Pulsar and acknowledge them, so that Pulsar can keep track of your progress? Then a consumer is what we want.

Start with creating a client.

When creating a consumer we have these options:

Only the subscription name and topic are required.

Create a consumer using the builder
var consumer = client.NewConsumer()
                     .SubscriptionName("MySubscription")
                     .Topic("persistent://public/default/mytopic")
                     .Create();
Create a consumer without the builder
var options = new ConsumerOptions("MySubscription", "persistent://public/default/mytopic");
var consumer = client.CreateConsumer(options);

We can either 'Receive' a single message or have an IAsyncEnumerable for 'Messages'.

If we just want a single message.

var message = await consumer.Receive();
Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
Receive messages as an IAsyncEnumerable

There's an extension method in "DotPulsar.Extensions" for reading messages as an IAsyncEnumerable.

await foreach (var message in consumer.Messages())
{
    Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
}

The message has these properties:

We have two options when acknowledging messages.

To just acknowledge a single message:

await consumer.Acknowledge(message);
Cumulative acknowledgment

To acknowledge all messages up to and include then provided message:

await consumer.AcknowledgeCumulative(message);

Please do note: We can't do this if the subscription is shared!

Need to reset the cursor? What's easy:

await consumer.Seek(MessageId.Earliest); // Provide the message-id to reset to.

Please do note: When seeking we will shortly be disconnected, as that's how Pulsar handles seek requests.

As easy as:

await consumer.Unsubscribe();

Please do note: We can't use the consumer after unsubscribing and therefore should dispose it.

Monitoring consumer state

Monitoring the state is recommended and described here.

The Consumer implements IAsyncDisposable and should be disposed when it's no longer needed. Disposing the client will dispose all consumers.


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4