Skip to content
GitHubXDiscordRSS

R2 Bucket Notifications

Learn how to process R2 bucket events with Cloudflare Queues and Workers using Alchemy.

This guide explains how to set up event notifications for R2 buckets and process them with a Worker via a Cloudflare Queue.

  1. Create an R2 Bucket and Queue

    Create an R2 bucket and a typed Queue to receive notification messages:

    import { R2Bucket, Queue, R2BucketNotificationMessage } from "alchemy/cloudflare";
    export const bucket = await R2Bucket("uploads");
    export const queue = await Queue<R2BucketNotificationMessage>("upload-events");
  2. Create the Notification Rule

    Connect the bucket to the queue with an R2BucketNotification:

    import { R2BucketNotification } from "alchemy/cloudflare";
    await R2BucketNotification("upload-notifications", {
    bucket,
    queue,
    eventTypes: ["object-create"],
    });
  3. Create a Worker to Process Events

    Create a Worker that consumes messages from the queue:

    import { Worker } from "alchemy/cloudflare";
    export const worker = await Worker("processor", {
    entrypoint: "./src/processor.ts",
    bindings: {
    BUCKET: bucket,
    },
    eventSources: [queue],
    });
  4. Implement the Event Handler

    Process bucket events in your Worker:

    src/processor.ts
    import type { queue, worker } from "../alchemy.run";
    export default {
    async queue(batch: typeof queue.Batch, env: typeof worker.Env) {
    for (const message of batch.messages) {
    const event = message.body;
    console.log(`Event: ${event.action} on ${event.bucket}/${event.object.key}`);
    if (event.action === "PutObject" || event.action === "CompleteMultipartUpload") {
    // Object was created - process it
    const object = await env.BUCKET.get(event.object.key);
    if (object) {
    console.log(`Processing ${event.object.key} (${event.object.size} bytes)`);
    // Do something with the object...
    }
    }
    message.ack();
    }
    },
    };
  5. (Optional) Filter Notifications

    Only receive notifications for specific objects using prefix and suffix filters:

    await R2BucketNotification("pdf-uploads", {
    bucket,
    queue,
    eventTypes: ["object-create"],
    prefix: "documents/",
    suffix: ".pdf",
    });
  6. (Optional) Handle Multiple File Types

    Need to process multiple file formats? Use an array of suffixes to create rules for each:

    await R2BucketNotification("audio-uploads", {
    bucket,
    queue,
    eventTypes: ["object-create"],
    prefix: "audio/",
    suffix: [".mp3", ".wav", ".flac", ".aac"],
    });

    This creates separate notification rules for each suffix under the hood. You can also use an array of prefixes:

    await R2BucketNotification("multi-folder-uploads", {
    bucket,
    queue,
    eventTypes: ["object-create"],
    prefix: ["uploads/", "imports/", "inbox/"],
    suffix: ".json",
    });
  7. (Optional) Handle Multiple Event Types

    Listen for both create and delete events:

    await R2BucketNotification("all-events", {
    bucket,
    queue,
    eventTypes: ["object-create", "object-delete"],
    });

    Then handle different actions in your worker:

    async queue(batch: typeof queue.Batch, env: typeof worker.Env) {
    for (const message of batch.messages) {
    const event = message.body;
    switch (event.action) {
    case "PutObject":
    case "CompleteMultipartUpload":
    case "CopyObject":
    console.log(`Object created: ${event.object.key}`);
    break;
    case "DeleteObject":
    case "LifecycleDeletion":
    console.log(`Object deleted: ${event.object.key}`);
    break;
    }
    message.ack();
    }
    }

Here’s a complete alchemy.run.ts file that sets up R2 notifications:

import { alchemy } from "alchemy";
import {
R2Bucket,
Queue,
R2BucketNotification,
Worker,
R2BucketNotificationMessage,
} from "alchemy/cloudflare";
const app = await alchemy("r2-notifications-demo");
export const bucket = await R2Bucket("uploads");
export const queue = await Queue<R2BucketNotificationMessage>("upload-events");
await R2BucketNotification("upload-notifications", {
bucket,
queue,
eventTypes: ["object-create"],
prefix: "incoming/",
});
export const worker = await Worker("processor", {
entrypoint: "./src/processor.ts",
bindings: {
BUCKET: bucket,
},
eventSources: [queue],
url: true,
});
console.log("Worker URL:", worker.url);
await app.finalize();

Each notification message contains:

FieldTypeDescription
accountstringCloudflare account ID
actionstringAction that triggered the event
bucketstringBucket name
object.keystringObject key/path
object.sizenumber?Size in bytes (not present for deletes)
object.eTagstring?Entity tag (not present for deletes)
eventTimestringISO 8601 timestamp
copySourceobject?Source info for CopyObject events
Event TypeActions
object-createPutObject, CompleteMultipartUpload, CopyObject
object-deleteDeleteObject, LifecycleDeletion