Cloudflare Queues is integrated with Cloudflare Workers. To send and receive messages, you must use a Worker.
A Worker that can send messages to a Queue is a producer Worker, while a Worker that can receive messages from a Queue is a consumer Worker. It is possible for the same Worker to be a producer and consumer, if desired.
In the future, we expect to support other APIs, such as HTTP endpoints to send or receive messages. To report bugs or request features, go to the Cloudflare Community Forums â. To give feedback, go to the #queues
â Discord channel.
These APIs allow a producer Worker to send messages to a Queue.
An example of writing a single message to a Queue:
type Environment = {
readonly MY_QUEUE: Queue;
};
export default {
async fetch(req: Request, env: Environment): Promise<Response> {
await env.MY_QUEUE.send({
url: req.url,
method: req.method,
headers: Object.fromEntries(req.headers),
});
return new Response('Sent!');
},
};
The Queues API also supports writing multiple messages at once:
const sendResultsToQueue = async (results: Array<any>, env: Environment) => {
const batch: MessageSendRequest[] = results.map((value) => ({
body: JSON.stringify(value),
}));
await env.queue.sendBatch(batch);
};
A binding that allows a producer to send messages to a Queue.
interface Queue<Body = unknown> {
send(body: Body, options?: QueueSendOptions): Promise<void>;
sendBatch(messages: Iterable<MessageSendRequest<Body>>, options?: QueueSendBatchOptions): Promise<void>;
}
send(bodyunknown, options?{ contentType?: QueuesContentType })
Promise<void>
sendBatch(bodyIterable<MessageSendRequest<unknown>>)
Promise<void>
A wrapper type used for sending message batches.
type MessageSendRequest<Body = unknown> = {
body: Body;
options?: QueueSendOptions;
};
body
unknown
options
QueueSendOptions
Optional configuration that applies when sending a message to a queue.
contentType
QueuesContentType
contentType
will be used by alternative consumer types to explicitly mark messages as serialized so they can be consumed in the desired type.delaySeconds
number
Optional configuration that applies when sending a batch of messages to a queue.
delaySeconds
number
A union type containing valid message content types.
// Default: json
type QueuesContentType = "text" | "bytes" | "json" | "v8";
"json"
to send a JavaScript object that can be JSON-serialized. This content type can be previewed from the Cloudflare dashboard â. The json
content type is the default."text"
to send a String
. This content type can be previewed with the List messages from the dashboard feature."bytes"
to send an ArrayBuffer
. This content type cannot be previewed from the Cloudflare dashboard â and will display as Base64-encoded."v8"
to send a JavaScript object that cannot be JSON-serialized but is supported by structured clone â (for example Date
and Map
). This content type cannot be previewed from the Cloudflare dashboard â and will display as Base64-encoded.Note
The default content type for Queues changed to json
(from v8
) to improve compatibility with pull-based consumers for any Workers with a compatibility date after 2024-03-18
.
If you specify an invalid content type, or if your specified content type does not match the message content's type, the send operation will fail with an error.
These APIs allow a consumer Worker to consume messages from a Queue.
To define a consumer Worker, add a queue()
function to the default export of the Worker. This will allow it to receive messages from the Queue.
By default, all messages in the batch will be acknowledged as soon as all of the following conditions are met:
queue()
function has returned.queue()
function returned a promise, the promise has resolved.waitUntil()
have resolved.If the queue()
function throws, or the promise returned by it or any of the promises passed to waitUntil()
were rejected, then the entire batch will be considered a failure and will be retried according to the consumer's retry settings.
Note
waitUntil()
is the only supported method to run tasks (such as logging or metrics calls) that resolve after a queue handler has completed. Promises that have not resolved by the time the queue handler returns may not complete and will not block completion of execution.
export default {
async queue(
batch: MessageBatch,
env: Environment,
ctx: ExecutionContext
): Promise<void> {
for (const message of batch.messages) {
console.log('Received', message);
}
},
};
The env
and ctx
fields are as documented in the Workers documentation.
Or alternatively, a queue consumer can be written using the (deprecated) service worker syntax:
addEventListener('queue', (event) => {
event.waitUntil(handleMessages(event));
});
In service worker syntax, event
provides the same fields and methods as MessageBatch
, as defined below, in addition to waitUntil()
â.
Note
When performing asynchronous tasks in your queue handler that iterates through messages, use an asynchronous version of iterating through your messages. For example, for (const m of batch.messages)
or await Promise.all(batch.messages.map(work))
allow for waiting for the results of asynchronous calls. batch.messages.forEach()
does not.
A batch of messages that are sent to a consumer Worker.
interface MessageBatch<Body = unknown> {
readonly queue: string;
readonly messages: Message<Body>[];
ackAll(): void;
retryAll(options?: QueueRetryOptions): void;
}
queue
string
messages
Message[]
ackAll()
void
queue()
consumer handler returns successfully or not.retryAll(options?: QueueRetryOptions)
void
options
object.A message that is sent to a consumer Worker.
interface Message<Body = unknown> {
readonly id: string;
readonly timestamp: Date;
readonly body: Body;
readonly attempts: number;
ack(): void;
retry(options?: QueueRetryOptions): void;
}
id
string
timestamp
Date
body
unknown
attempts
number
ack()
void
queue()
consumer handler returns successfully or not.retry(options?: QueueRetryOptions)
void
options
object.Optional configuration when marking a message or a batch of messages for retry.
interface QueueRetryOptions {
delaySeconds?: number;
}
delaySeconds
number
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4