A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://pulsar.apache.org/docs/en/develop-binary-protocol/ below:

Pulsar binary protocol specification | Apache Pulsar

Pulsar binary protocol specification

Pulsar uses a custom binary protocol for communications between producers/consumers and brokers. This protocol is designed to support required features, such as acknowledgments and flow control, while ensuring maximum transport and implementation efficiency.

Clients and brokers exchange commands with each other. Commands are formatted as binary protocol buffer (aka protobuf) messages. The format of protobuf commands is specified in the PulsarApi.proto file and also documented in the Protobuf interface section below.

Connection sharing​

Commands for different producers and consumers can be interleaved and sent through the same connection without restriction.

All commands associated with Pulsar's protocol are contained in a BaseCommand protobuf message that includes a Type enum with all possible subcommands as optional fields. BaseCommand messages can specify only one subcommand.

Framing​

Since protobuf doesn't provide any sort of message frame, all messages in the Pulsar protocol are prepended with a 4-byte field that specifies the size of the frame. The maximum allowable size of a single frame is 5 MB.

The Pulsar protocol allows for two types of commands:

  1. Simple commands that do not carry a message payload.
  2. Payload commands that bear a payload that is used when publishing or delivering messages. In payload commands, the protobuf command data is followed by protobuf metadata and then the payload, which is passed in a raw format outside of protobuf. All sizes are passed as 4-byte unsigned big-endian integers.

Message payloads are passed in raw format rather than protobuf format for efficiency reasons.

Simple commands​

Simple (payload-free) commands have this basic structure:

Component Description Size (in bytes) totalSize The size of the frame, counting everything that comes after it (in bytes) 4 commandSize The size of the protobuf-serialized command 4 command The protobuf serialized command Message commands​

Payload commands have this basic structure:

Component Required or optional Description Size (in bytes) totalSize Required The size of the frame, counting everything that comes after it (in bytes) 4 commandSize Required The size of the protobuf-serialized command 4 command Required The protobuf serialized command magicNumberOfBrokerEntryMetadata Optional A 2-byte byte array (0x0e02) identifying the broker entry metadata
Note: magicNumberOfBrokerEntryMetadata , brokerEntryMetadataSize, and brokerEntryMetadata should be used together. 2 brokerEntryMetadataSize Optional The size of the broker entry metadata 4 brokerEntryMetadata Optional The broker entry metadata stored as a binary protobuf message magicNumber Required A 2-byte byte array (0x0e01) identifying the current format 2 checksum Required A CRC32-C checksum of everything that comes after it 4 metadataSize Required The size of the message metadata 4 metadata Required The message metadata stored as a binary protobuf message payload Required Anything left in the frame is considered the payload and can include any sequence of bytes Broker entry metadata​

Broker entry metadata is stored alongside the message metadata as a serialized protobuf message. It is created by the broker when the message arrived at the broker and passed without changes to the consumer if configured.

Field Required or optional Description broker_timestamp Optional The timestamp when a message arrived at the broker (id est as the number of milliseconds since January 1st, 1970 in UTC) index Optional The index of the message. It is assigned by the broker.

If you want to use broker entry metadata for brokers, configure the brokerEntryMetadataInterceptors parameter in the broker.conf file.

If you want to use broker entry metadata for consumers:

  1. Use the client protocol version 18 or later.

  2. Configure the brokerEntryMetadataInterceptors parameter and set the exposingBrokerEntryMetadataToClientEnabled parameter to true in the broker.conf file.

Message metadata is stored alongside the application-specified payload as a serialized protobuf message. Metadata is created by the producer and passed without changes to the consumer.

Field Required or optional Description producer_name Required The name of the producer that published the message sequence_id Required The sequence ID of the message, assigned by producer publish_time Required The publish timestamp in Unix time (i.e. as the number of milliseconds since January 1st, 1970 in UTC) properties Required A sequence of key/value pairs (using the KeyValue message). These are application-defined keys and values with no special meaning to Pulsar. replicated_from Optional Indicates that the message has been replicated and specifies the name of the cluster where the message was originally published partition_key Optional While publishing on a partitioned topic, if the key is present, the hash of the key is used to determine which partition to choose. Partition key is used as the message key. compression Optional Signals that payload has been compressed and with which compression library uncompressed_size Optional If compression is used, the producer must fill the uncompressed size field with the original payload size num_messages_in_batch Optional If this message is really a batch of multiple entries, this field must be set to the number of messages in the batch Batch messages​

When using batch messages, the payload will be containing a list of entries, each of them with its individual metadata, defined by the SingleMessageMetadata object.

For a single batch, the payload format will look like this:

Field Required or optional Description metadataSizeN Required The size of the single message metadata serialized Protobuf metadataN Required Single message metadata payloadN Required Message payload passed by application

Each metadata field looks like this;

Field Required or optional Description properties Required Application-defined properties partition key Optional Key to indicate the hashing to a particular partition payload_size Required Size of the payload for the single message in the batch

When compression is enabled, the whole batch will be compressed at once.

Interactions​ Connection establishment​

After opening a TCP connection to a broker, typically on port 6650, the client is responsible to initiate the session.

After receiving a Connected response from the broker, the client can consider the connection ready to use. Alternatively, if the broker doesn't validate the client authentication, it will reply with an Error command and close the TCP connection.

Example:

message CommandConnect {
"client_version" : "Pulsar-Client-Java-v1.15.2",
"auth_method_name" : "my-authentication-plugin",
"auth_data" : "my-auth-data",
"protocol_version" : 6
}

Fields:

message CommandConnected {
"server_version" : "Pulsar-Broker-v1.15.2",
"protocol_version" : 6
}

Fields:

Keep Alive​

To identify prolonged network partitions between clients and brokers or cases in which a machine crashes without interrupting the TCP connection on the remote end (eg: power outage, kernel panic, hard reboot...), we have introduced a mechanism to probe for the availability status of the remote peer.

Both clients and brokers are sending Ping commands periodically and they will close the socket if a Pong response is not received within a timeout (default used by broker is 60s).

A valid implementation of a Pulsar client is not required to send the Ping probe, though it is required to promptly reply after receiving one from the broker in order to prevent the remote side from forcibly closing the TCP connection.

Producer​

In order to send messages, a client needs to establish a producer. When creating a producer, the broker will first verify that this particular client is authorized to publish on the topic.

Once the client gets confirmation of the producer creation, it can publish messages to the broker, referring to the producer ID negotiated before.

If the client does not receive a response indicating producer creation success or failure, the client should first send a command to close the original producer before sending a command to re-attempt producer creation.

note

Before creating or connecting a producer, you need to perform topic lookup first.

Command Producer​
message CommandProducer {
"topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
"producer_id" : 1,
"request_id" : 1
}

Fields:

The broker will reply with either ProducerSuccess or Error commands.

Command ProducerSuccess​
message CommandProducerSuccess {
"request_id" : 1,
"producer_name" : "generated-unique-producer-name"
}

Fields:

Command Send​

Command Send is used to publish a new message within the context of an already existing producer. If a producer has not yet been created for the connection, the broker will terminate the connection. This command is used in a frame that includes command as well as message payload, for which the complete format is specified in the message commands section.

message CommandSend {
"producer_id" : 1,
"sequence_id" : 0,
"num_messages" : 1
}

Fields:

Command SendReceipt​

After a message has been persisted on the configured number of replicas, the broker will send the acknowledgment receipt to the producer.

message CommandSendReceipt {
"producer_id" : 1,
"sequence_id" : 0,
"message_id" : {
"ledgerId" : 123,
"entryId" : 456
}
}

Fields:

Command CloseProducer​

note

This command can be sent by either producer or broker.

When receiving a CloseProducer command, the broker will stop accepting any more messages for the producer, wait until all pending messages are persisted and then reply Success to the client.

If the client does not receive a response to a Producer command within a timeout, the client must first send a CloseProducer command before sending another Producer command. The client does not need to await a response to the CloseProducer command before sending the next Producer command.

The broker can send a CloseProducer command to client when it's performing a graceful failover (eg: broker is being restarted, or the topic is being unloaded by load balancer to be transferred to a different broker).

When receiving the CloseProducer, the client is expected to go through the service discovery lookup again and recreate the producer again. The TCP connection is not affected.

Consumer​

A consumer is used to attach to a subscription and consume messages from it. After every reconnection, a client needs to subscribe to the topic. If a subscription is not already there, a new one will be created.

note

Before creating or connecting a consumer, you need to perform topic lookup first.

If the client does not receive a response indicating consumer creation success or failure, the client should first send a command to close the original consumer before sending a command to re-attempt consumer creation.

Flow control​

After the consumer is ready, the client needs to give permission to the broker to push messages. This is done with the Flow command.

A Flow command gives additional permits to send messages to the consumer. A typical consumer implementation will use a queue to accumulate these messages before the application is ready to consume them.

After the application has dequeued half of the messages in the queue, the consumer sends permits to the broker to ask for more messages (equals to half of the messages in the queue).

For example, if the queue size is 1000 and the consumer consumes 500 messages in the queue. Then the consumer sends permits to the broker to ask for 500 messages.

Command Subscribe​
message CommandSubscribe {
"topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
"subscription" : "my-subscription-name",
"subType" : "Exclusive",
"consumer_id" : 1,
"request_id" : 1
}

Fields:

Command Flow​
message CommandFlow {
"consumer_id" : 1,
"messagePermits" : 1000
}

Fields:

Command Message​

Command Message is used by the broker to push messages to an existing consumer, within the limits of the given permits.

This command is used in a frame that includes the message payload as well, for which the complete format is specified in the message commands section.

message CommandMessage {
"consumer_id" : 1,
"message_id" : {
"ledgerId" : 123,
"entryId" : 456
}
}
Command Ack​

An Ack is used to signal to the broker that a given message has been successfully processed by the application and can be discarded by the broker.

In addition, the broker will also maintain the consumer position based on the acknowledged messages.

message CommandAck {
"consumer_id" : 1,
"ack_type" : "Individual",
"message_id" : {
"ledgerId" : 123,
"entryId" : 456
}
}

Fields:

Command AckResponse​

An AckResponse is the broker's response to acknowledge a request sent by the client. It contains the consumer_id sent in the request. If a transaction is used, it contains both the Transaction ID and the Request ID that are sent in the request. The client finishes the specific request according to the Request ID. If the error field is set, it indicates that the request has failed.

An example of AckResponse with redirection:

message CommandAckResponse {
"consumer_id" : 1,
"txnid_least_bits" = 0,
"txnid_most_bits" = 1,
"request_id" = 5
}
Command CloseConsumer​

note

This command can be sent by either producer or broker.

This command behaves the same as CloseProducer

If the client does not receive a response to a Subscribe command within a timeout, the client must first send a CloseConsumer command before sending another Subscribe command. The client does not need to await a response to the CloseConsumer command before sending the next Subscribe command.

Command RedeliverUnacknowledgedMessages​

A consumer can ask the broker to redeliver some or all of the pending messages that were pushed to that particular consumer and not yet acknowledged.

The protobuf object accepts a list of message IDs that the consumer wants to be redelivered. If the list is empty, the broker will redeliver all the pending messages.

On redelivery, messages can be sent to the same consumer or, in the case of a shared subscription, spread across all available consumers.

Command ReachedEndOfTopic​

This is sent by a broker to a particular consumer, whenever the topic has been "terminated" and all the messages on the subscription were acknowledged.

The client should use this command to notify the application that no more messages are coming from the consumer.

Command ConsumerStats​

This command is sent by the client to retrieve Subscriber and Consumer level stats from the broker.

Fields:

Command ConsumerStatsResponse​

This is the broker's response to ConsumerStats request by the client. It contains the Subscriber and Consumer level stats of the consumer_id sent in the request. If the error_code or the error_message field is set it indicates that the request has failed.

Command Unsubscribe​

This command is sent by the client to unsubscribe the consumer_id from the associated topic.

Fields:

Service discovery​ Topic lookup​

Topic lookup needs to be performed each time a client needs to create or reconnect a producer or a consumer. Lookup is used to discover which particular broker is serving the topic we are about to use.

Lookup can be done with a REST call as described in the admin API docs.

Since Pulsar-1.16 it is also possible to perform the lookup within the binary protocol.

For the sake of example, let's assume we have a service discovery component running at pulsar://broker.example.com:6650

Individual brokers will be running at pulsar://broker-1.example.com:6650, pulsar://broker-2.example.com:6650, ...

A client can use a connection to the discovery service host to issue a LookupTopic command. The response can either be a broker hostname to connect to, or a broker hostname to which retry the lookup.

The LookupTopic command has to be used in a connection that has already gone through the Connect / Connected initial handshake.

message CommandLookupTopic {
"topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
"request_id" : 1,
"authoritative" : false
}

Fields:

LookupTopicResponse​

An example of response with successful lookup:

message CommandLookupTopicResponse {
"request_id" : 1,
"response" : "Connect",
"brokerServiceUrl" : "pulsar://broker-1.example.com:6650",
"brokerServiceUrlTls" : "pulsar+ssl://broker-1.example.com:6651",
"authoritative" : true
}

This is an example of lookup response with redirection:

message CommandLookupTopicResponse {
"request_id" : 1,
"response" : "Redirect",
"brokerServiceUrl" : "pulsar://broker-2.example.com:6650",
"brokerServiceUrlTls" : "pulsar+ssl://broker-2.example.com:6651",
"authoritative" : true
}

In this second case, we need to reissue the LookupTopic command request to broker-2.example.com and this broker will be able to give a definitive answer to the lookup request.

Partitioned topics discovery​

Partitioned topics metadata discovery is used to find out if a topic is a "partitioned topic" and how many partitions were set up.

If the topic is marked as "partitioned", the client is expected to create multiple producers or consumers, one for each partition, using the partition-X suffix.

This information only needs to be retrieved the first time a producer or consumer is created. There is no need to do this after reconnections.

The discovery of partitioned topics metadata works very similarly to the topic lookup. The client send a request to the service discovery address and the response will contain actual metadata.

Command PartitionedTopicMetadata​
message CommandPartitionedTopicMetadata {
"topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
"request_id" : 1
}

Fields:

Command PartitionedTopicMetadataResponse​

An example of response with metadata:

message CommandPartitionedTopicMetadataResponse {
"request_id" : 1,
"response" : "Success",
"partitions" : 32
}
Protobuf interface​

All Pulsar's Protobuf definitions can be found here.


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4