Skip to content
GitHubXDiscordRSS

LogPushJob

Creates and manages Cloudflare LogPush Jobs for streaming logs to external destinations.

LogPush jobs can be scoped to either an account or a zone, and support various log datasets like HTTP requests, firewall events, DNS logs, Workers trace events, and more.

Stream HTTP request logs to an R2 bucket:

import { LogPushJob, R2Bucket } from "alchemy/cloudflare";
const bucket = await R2Bucket("logs-bucket", {
name: "my-logs-bucket",
});
const httpLogs = await LogPushJob("http-logs", {
dataset: "http_requests",
destination: bucket, // Automatically constructs R2 URL with credentials
});

Organize logs by date using path variables:

import { LogPushJob, R2Bucket } from "alchemy/cloudflare";
const bucket = await R2Bucket("logs-bucket", {
name: "my-logs-bucket",
});
const organizedLogs = await LogPushJob("organized-logs", {
dataset: "http_requests",
destination: bucket,
name: "HTTP Logs - Daily",
});

The destination will automatically include {DATE} path variables: r2://my-logs-bucket/logs/{DATE}/...

Alternatively, use a direct destination string with embedded credentials:

import { LogPushJob } from "alchemy/cloudflare";
const logsToR2 = await LogPushJob("http-to-r2", {
dataset: "http_requests",
destination: `r2://my-logs-bucket/logs/{DATE}?account-id=${accountId}&access-key-id=${accessKeyId}&secret-access-key=${secretAccessKey}`,
});

Create a LogPush job scoped to a specific zone:

import { LogPushJob, Zone } from "alchemy/cloudflare";
const zone = await Zone("example-zone", {
name: "example.com",
type: "full",
});
const zoneLogs = await LogPushJob("zone-http-logs", {
zone, // or zone object: zone
dataset: "http_requests",
destination: "https://logs.example.com/cloudflare/zone",
name: "Zone HTTP Logs",
});

Create an account-level LogPush job (default when zone is not specified):

import { LogPushJob } from "alchemy/cloudflare";
const accountLogs = await LogPushJob("account-firewall-logs", {
dataset: "firewall_events",
destination: "https://logs.example.com/cloudflare/firewall",
name: "Account Firewall Logs",
});

Stream Workers trace events to R2 for performance analysis and debugging:

import { LogPushJob, R2Bucket, Worker } from "alchemy/cloudflare";
// Create R2 bucket for worker logs
const bucket = await R2Bucket("workers-logs", {
name: "workers-trace-logs",
});
// Enable LogPush on your worker
const worker = await Worker("api", {
entrypoint: "./src/api.ts",
logpush: true, // Enable trace event collection
});
// Stream trace events to R2
const workerLogs = await LogPushJob("worker-traces", {
dataset: "workers_trace_events",
destination: bucket,
outputOptions: {
outputType: "ndjson",
fieldNames: [
"Event",
"EventTimestampMs",
"Outcome",
"ScriptName",
"Logs",
"Exceptions",
"DispatchNamespace",
],
},
});

Filter logs based on specific conditions:

import { LogPushJob } from "alchemy/cloudflare";
const blockedRequests = await LogPushJob("blocked-requests", {
dataset: "firewall_events",
destination: "https://logs.example.com/security/blocked",
filter: '{"where":{"and":[{"key":"Action","operator":"eq","value":"block"}]}}',
name: "Blocked Requests",
});

Filter by country (excluding Canada):

const nonCanadianLogs = await LogPushJob("non-canadian-logs", {
dataset: "http_requests",
destination: "https://logs.example.com/http/filtered",
filter: '{"where":{"and":[{"key":"ClientCountry","operator":"neq","value":"ca"}]}}',
});

Configure custom output format with specific fields:

import { LogPushJob } from "alchemy/cloudflare";
const customFormat = await LogPushJob("custom-format-logs", {
dataset: "http_requests",
destination: "https://analytics.example.com/ingest/custom",
outputOptions: {
outputType: "ndjson",
timestampFormat: "unixnano",
fieldNames: [
"ClientIP",
"ClientRequestHost",
"ClientRequestMethod",
"ClientRequestURI",
"EdgeResponseStatus",
"EdgeStartTimestamp",
],
},
});

CSV output with custom delimiter:

const csvLogs = await LogPushJob("csv-logs", {
dataset: "http_requests",
destination: "https://analytics.example.com/ingest/csv",
outputOptions: {
outputType: "csv",
fieldDelimiter: "|",
timestampFormat: "rfc3339",
fieldNames: ["ClientIP", "EdgeResponseStatus", "EdgeStartTimestamp"],
},
});

Sample high-volume logs to reduce storage costs:

import { LogPushJob } from "alchemy/cloudflare";
const sampledLogs = await LogPushJob("sampled-logs", {
dataset: "http_requests",
destination: "https://analytics.example.com/sampled",
outputOptions: {
sampleRate: 0.1, // 10% sampling
outputType: "ndjson",
fieldNames: ["ClientIP", "EdgeResponseStatus", "EdgeStartTimestamp"],
},
});

Configure batch size, interval, and record limits:

import { LogPushJob } from "alchemy/cloudflare";
const batchedLogs = await LogPushJob("batched-logs", {
dataset: "http_requests",
destination: "https://logs.example.com/batched",
maxUploadBytes: 100 * 1024 * 1024, // 100MB batches
maxUploadIntervalSeconds: 300, // 5 minutes max interval
maxUploadRecords: 100000, // 100k records per batch
});

Batch Settings:

SettingRangeDefaultPurpose
maxUploadBytes5MB - 1GB, or 0-Max uncompressed file size
maxUploadIntervalSeconds30 - 300s, or 0-Max time before sending batch
maxUploadRecords1,000 - 1,000,000, 0-Max log lines per batch

Use a custom template for log formatting:

import { LogPushJob } from "alchemy/cloudflare";
const templateLogs = await LogPushJob("template-logs", {
dataset: "http_requests",
destination: "https://logs.example.com/template",
outputOptions: {
fieldNames: ["ClientIP", "EdgeResponseStatus", "EdgeStartTimestamp"],
recordTemplate: '{"ip":"{{.ClientIP}}","status":{{.EdgeResponseStatus}},"time":{{.EdgeStartTimestamp}}}',
recordDelimiter: "\n",
},
});

LogPush supports the following datasets:

Zone-Level:

  • http_requests - HTTP request logs
  • firewall_events - WAF and firewall events
  • dns_logs - Authoritative DNS logs
  • dns_firewall_logs - DNS firewall logs
  • spectrum_events - Spectrum application events
  • nel_reports - Network Error Logging reports

Account-Level:

  • workers_trace_events - Workers execution traces
  • audit_logs - Cloudflare audit logs
  • gateway_dns - Gateway DNS logs
  • gateway_http - Gateway HTTP logs
  • gateway_network - Gateway network logs
  • access_requests - Access authentication logs
  • casb_findings - CASB security findings
  • device_posture_results - Zero Trust device posture
  • zero_trust_network_sessions - Network session logs
  • magic_ids_detections - Magic IDS detections
  • page_shield_events - Page Shield events

LogPush supports multiple destination types:

HTTPS Endpoints:

https://logs.example.com/cloudflare/endpoint

Sumo Logic:

https://endpoint.sumologic.com/receiver/v1/http/xxx

Splunk:

https://http-inputs-example.splunkcloud.com/services/collector/raw?channel=xxx

Datadog (use HTTPS format):

https://http-intake.logs.datadoghq.com/api/v2/logs?dd-api-key=xxx&ddsource=cloudflare

R2 (recommended - use R2Bucket resource):

r2://bucket-name/path?account-id=xxx&access-key-id=xxx&secret-access-key=xxx