Skip to content

Pipeline

The Pipeline resource lets you create Cloudflare Pipeline data pipelines for collecting, transforming and routing data to destinations like R2 buckets.

Minimal Example

Create a basic pipeline that routes data to an R2 bucket:

ts
import { Pipeline, R2Bucket } from "alchemy/cloudflare";

const bucket = await R2Bucket("logs-bucket", {
  name: "logs-bucket"
});

const pipeline = await Pipeline("logs-pipeline", {
  name: "logs-pipeline", 
  destination: {
    type: "r2",
    format: "json",
    path: {
      bucket: bucket.name,
      prefix: "app-logs"
    },
    credentials: {
      accessKeyId: alchemy.secret(process.env.R2_ACCESS_KEY_ID!),
      secretAccessKey: alchemy.secret(process.env.R2_SECRET_ACCESS_KEY!)
    }
  }
});

Custom Source Configuration

Configure custom HTTP source with CORS and authentication:

ts
import { Pipeline } from "alchemy/cloudflare";

const pipeline = await Pipeline("custom-pipeline", {
  name: "custom-pipeline",
  source: [{
    type: "http",
    format: "json", 
    authentication: true,
    cors: {
      origins: ["https://example.com"]
    }
  }],
  destination: {
    type: "r2",
    format: "json",
    path: {
      bucket: "my-bucket",
      prefix: "data"
    },
    credentials: {
      accessKeyId: alchemy.secret(process.env.R2_ACCESS_KEY_ID!),
      secretAccessKey: alchemy.secret(process.env.R2_SECRET_ACCESS_KEY!)
    }
  }
});

Batch Configuration

Configure batching behavior for pipeline output:

ts
import { Pipeline } from "alchemy/cloudflare";

const pipeline = await Pipeline("batch-pipeline", {
  name: "batch-pipeline",
  destination: {
    type: "r2",
    format: "json",
    path: {
      bucket: "my-bucket"
    },
    credentials: {
      accessKeyId: alchemy.secret(process.env.R2_ACCESS_KEY_ID!),
      secretAccessKey: alchemy.secret(process.env.R2_SECRET_ACCESS_KEY!)
    },
    batch: {
      maxMb: 50,
      maxRows: 1000000,
      maxSeconds: 60
    }
  }
});

Bind to a Worker

Use the pipeline in a Cloudflare Worker:

ts
import { Worker, Pipeline } from "alchemy/cloudflare";

const pipeline = await Pipeline("logs", {
  name: "logs-pipeline",
  destination: {
    type: "r2",
    format: "json",
    path: {
      bucket: "logs"
    },
    credentials: {
      accessKeyId: alchemy.secret(process.env.R2_ACCESS_KEY_ID!),
      secretAccessKey: alchemy.secret(process.env.R2_SECRET_ACCESS_KEY!)
    }
  }
});

await Worker("api", {
  name: "api-worker",
  script: "console.log('Hello, world!')",
  bindings: {
    LOGS: pipeline
  }
});