Skip to content
GitHubXDiscord

QueueConsumer

Creates a consumer for a Cloudflare Queue that processes messages using a Worker.

Create a basic queue consumer with default settings.

import { Queue, QueueConsumer } from "alchemy/cloudflare";
const queue = await Queue("notifications", {
name: "notifications",
});
const consumer = await QueueConsumer("notification-processor", {
queue,
scriptName: "notification-worker",
});

Configure batch size, concurrency, retries and other settings.

import { Queue, QueueConsumer } from "alchemy/cloudflare";
const consumer = await QueueConsumer("batch-processor", {
queue,
scriptName: "batch-worker",
settings: {
batchSize: 50, // Process 50 messages at once
maxConcurrency: 10, // Allow 10 concurrent invocations
maxRetries: 5, // Retry failed messages up to 5 times
maxWaitTimeMs: 2000, // Wait up to 2 seconds to fill a batch
retryDelay: 60, // Wait 60 seconds before retrying failed messages
},
});

Bind a queue consumer to a worker.

import { Worker, QueueConsumer } from "alchemy/cloudflare";
const consumer = await QueueConsumer("my-consumer", {
queue,
scriptName: "my-worker",
});
await Worker("my-worker", {
name: "my-worker",
script: "console.log('Hello, world!')",
bindings: {
myConsumer: consumer,
},
});

Preferred: Configure via Worker eventSources

Section titled “Preferred: Configure via Worker eventSources”

The recommended approach is to configure queue consumers through Worker eventSources:

import { Worker, Queue } from "alchemy/cloudflare";
const queue = await Queue("task-queue", {
name: "task-queue",
});
await Worker("task-processor", {
entrypoint: "./src/processor.ts",
// Configure queue consumer via eventSources
eventSources: [{
queue,
settings: {
batchSize: 20, // Process 20 messages at once
maxConcurrency: 4, // Allow 4 concurrent invocations
maxRetries: 3, // Retry failed messages up to 3 times
maxWaitTimeMs: 1000, // Wait up to 1 second to fill a batch
retryDelay: 45, // Wait 45 seconds before retrying failed messages
deadLetterQueue: "failed-tasks" // Send failed messages to DLQ
}
}]
});

Configure a Worker as both a queue producer and consumer:

const queue = await Queue("workflow-queue", {
name: "workflow-queue",
});
await Worker("workflow-processor", {
entrypoint: "./src/workflow.ts",
bindings: {
QUEUE: queue // Producer: bind queue for sending messages
},
eventSources: [{ // Consumer: configure processing settings
queue,
settings: {
batchSize: 5,
maxConcurrency: 2,
maxWaitTimeMs: 3000,
maxRetries: 2,
retryDelay: 60
}
}]
});