Queue
This guide explains how to create and use Cloudflare Queues with your Worker applications.
Create a Queue
Section titled “Create a Queue”Create a Queue with a type for the message payload:
import { Queue } from "alchemy/cloudflare";
// Define the message payload typeexport const queue = await Queue<{ name: string; email: string;}>("my-worker-queue");
// Create a dead letter queue for failed messagesexport const dlq = await Queue("failed-messages-dlq");
// Create main queue with DLQ supportexport const queueWithDlq = await Queue<{ name: string; email: string;}>("my-worker-queue-with-dlq", { dlq: dlq // or dlq: "failed-messages-dlq"});
Configure Queue Producer
Section titled “Configure Queue Producer”Bind the Queue to your Worker as an environment variable to send messages.
import { Worker } from "alchemy/cloudflare";
export const worker = await Worker("my-worker", { entrypoint: "./src/worker.ts", bindings: { QUEUE: queue, // Bind queue as QUEUE environment variable },});
Send Messages Using Producer
Section titled “Send Messages Using Producer”Access the Queue from your Worker’s fetch handler to send messages.
import type { worker } from "../alchemy.run";
export default { async fetch(request: Request, env: typeof worker.Env) { // Send a message to the queue await env.QUEUE.send({ name: "John Doe", email: "john.doe@example.com", });
return new Response("Ok"); },};
Configure Queue Consumer
Section titled “Configure Queue Consumer”Register your Worker as a consumer of the Queue by adding it to eventSources.
import { Worker } from "alchemy/cloudflare";
export const worker = await Worker("my-worker", { // add the event source eventSources: [queue],});
Configure Consumer Settings
Section titled “Configure Consumer Settings”You can customize how your Worker consumes messages by providing settings:
export const worker = await Worker("my-worker", { eventSources: [{ queue, settings: { batchSize: 25, // Process 25 messages at once maxConcurrency: 5, // Allow 5 concurrent invocations maxRetries: 3, // Retry failed messages up to 3 times maxWaitTimeMs: 1500, // Wait up to 1.5 seconds to fill a batch retryDelay: 45, // Wait 45 seconds before retrying failed messages deadLetterQueue: dlq // Send failed messages to dead letter queue } }]});
Process Messages Using Consumer
Section titled “Process Messages Using Consumer”Implement the queue handler using a type-safe batch parameter.
import type { queue, worker } from "../alchemy.run";
export default { // other handlers like fetch...
// Process queue messages with proper type safety async queue(batch: typeof queue.Batch, env: typeof worker.Env) { // Process each message in the batch for (const message of batch.messages) { console.log(message); // Acknowledge individual message message.ack(); }
// Or acknowledge all messages at once // batch.ackAll(); },};
Generate Wrangler Config
Section titled “Generate Wrangler Config”Create the Wrangler configuration file to include the Queue binding.
import { WranglerJson } from "alchemy/cloudflare";
await WranglerJson("wrangler.jsonc", { worker,});
Type-safe Environment
Section titled “Type-safe Environment”Set up type definitions for your Worker environment for better type safety.
import type { worker } from "./alchemy.run";
export type WorkerEnv = typeof worker.Env;
declare module "cloudflare:workers" { namespace Cloudflare { export interface Env extends WorkerEnv {} }}
Complete Example
Section titled “Complete Example”A complete implementation example showing both producer and consumer roles in the same Worker.
import alchemy from "alchemy";import { Queue, Worker, WranglerJson } from "alchemy/cloudflare";
const app = await alchemy("queue-example");
// Create a typed queue with exportexport const queue = await Queue<{ name: string; email: string;}>("example-worker-queue");
// Create worker as both producer and consumerexport const worker = await Worker("example-worker", { entrypoint: "./src/worker.ts", bindings: { QUEUE: queue, // Producer: bind queue for sending messages }, eventSources: [queue], // Consumer: register worker to receive messages});
// Generate wrangler configawait WranglerJson("wrangler.jsonc", { worker,});
await app.finalize();
import type { queue, worker } from "../alchemy.run";
export default { // Producer: send messages async fetch(request: Request, env: typeof worker.Env) { await env.QUEUE.send({ name: "John Doe", email: "john.doe@example.com", }); return new Response("Ok"); },
// Consumer: process messages with type-safe batch async queue(batch: typeof queue.Batch, env: typeof worker.Env) { for (const message of batch.messages) { console.log(message); message.ack(); } batch.ackAll(); },};