Skip to content

Pipeline

The data pipeline follows a domain + analytics split: Workers handle PG reads/writes for domain operations, then push enriched events to Cloudflare Pipelines for analytical retention on R2. Pipeline orchestration uses CF Workflows + Durable Objects — no pipeline state tables in PG.

[BFO: Process chain linked by precedes | PKO: ProcedureExecution with Steps | IAO: IBEs → typed Measurement Data]

Source events (Shopify webhooks, platform interactions, cron sync)
┌────────────────────────────────────┐
│ Worker enrichment │ PG reads/writes (domain operations)
│ (order_header, product, payment, │ + event denormalization
│ publisher, fulfillment tables) │
└─────────┬──────────────────────────┘
│ enriched events
┌────────────────────────────────────┐
│ Cloudflare Pipelines │ SQL transforms per stream
│ Three telemetry streams │ → R2 Data Catalog (Iceberg/Parquet)
│ (ADR-008) │
└─────────┬──────────────────────────┘
│ Iceberg REST catalog
┌────────────────────────────────────┐
│ DuckDB over R2 │ Sliding windows, aggregations,
│ (WASM / Container / MotherDuck) │ nominal classifications
└────────────────────────────────────┘
┌────────────────────────────────────┐
│ Workers Analytics Engine │ Real-time counters (clicks,
│ (parallel, not sequential) │ impressions, page views)
└────────────────────────────────────┘

See ADR-006 through ADR-009 for the rationale behind this architecture.

Workers are the active processing layer. They perform domain operations (PG reads/writes) before pushing enriched events to Pipeline streams (ADR-007). Events arrive on R2 already denormalized — analytical queries never need to join back to PG.

Triggered by Shopify webhooks or the Daily Shopify Bulk Sync scenario:

  1. Receive — Webhook or cron trigger arrives at Worker
  2. Domain write — Upsert into PG domain tables (order_header, order_line_item, product, payment, etc.)
  3. Enrich — Read back related data (customer, product metadata, fulfillment state) to denormalize
  4. Stream push — Push enriched event to COMMERCE_TELEMETRY Pipeline

Events: order_created, order_completed, payment_captured, payment_refunded, fulfillment_shipped, return_requested

Triggered by platform interactions (clicks, impressions, views):

  1. Receive — Platform worker captures visitor interaction
  2. Stream push — Push to PUBLICATION_TELEMETRY Pipeline (may skip PG entirely for raw engagement events)
  3. Domain write (conditional) — Only writes to PG publication_telemetry for events that need domain-level tracking (referrals, conversions, attribution)

High-volume stream — Workers Analytics Engine handles real-time aggregation in parallel.

Emitted by CF Workflows at procedure boundaries:

  1. Workflow step completes — Record completion metadata (record counts, watermark positions, durations)
  2. Stream push — Push to SYNC_TELEMETRY Pipeline for long-term audit retention on R2

Per ADR-008, three dedicated Pipeline streams with independent SQL transforms:

StreamEventsVolumeIceberg table
COMMERCE_TELEMETRYorder_created, order_completed, payment_captured, payment_refunded, fulfillment_shipped, return_requestedMedium (order throughput)commerce.telemetry
PUBLICATION_TELEMETRYclick, impression, view, engagement, conversion — enriched with publisher/item/program contextHigh (every visitor interaction)publication.telemetry
SYNC_TELEMETRYprocedure completions with record counts, watermark positions, durationsLow (operational audit trail)sync.telemetry

Pipeline SQL transforms are stateless, row-level operations (field extraction, type coercion, timestamp normalization). Complex aggregation happens downstream in DuckDB.

Iceberg tableOntological typeWhat it holdsPartitioning
commerce.telemetryProcess Boundary (BFO)Commerce lifecycle eventsDaily
publication.telemetryProcess Boundary (BFO)Engagement eventsHourly (high volume)
sync.telemetryProcess Record (IAO)Pipeline execution audit trailDaily
measurement.salesMeasurement Datum (IAO)Daily sales aggregates by product x day (ratio scale)Daily
measurement.performanceDataset (IAO)Aggregated metrics with sliding windows + nominal classificationDaily
configuration.objective_specObjective Specification (IAO)Classification thresholds (replicated from PG macrodata_artifact)Unpartitioned

Per ADR-009, all pipeline orchestration uses Cloudflare Workflows — not PG tables.

The Daily Shopify Bulk Sync is a multi-step Workflow:

  1. Step: Initiate — Cron fires → Workflow starts. Reads watermark from Durable Object
  2. Step: Request — Initiates Shopify bulkOperationRunQuery (GraphQL)
  3. Step: Poll — Workflow sleeps/retries until bulk operation completes
  4. Step: Download — Downloads JSONL results, archives to R2 for audit/replay
  5. Step: Parse + Upsert — Parses JSONL and upserts into PG domain tables (order_header, order_line_item, product) in chunked batches
  6. Step: Enrich + Stream — Pushes enriched events to COMMERCE_TELEMETRY Pipeline
  7. Step: Advance Watermark — Updates watermark in Durable Object storage

Replaces the old PG-based mart refresh. Runs as a DuckDB query over R2 Iceberg tables:

  1. Step: Query — DuckDB reads commerce.telemetry Iceberg table, aggregates by product x day
  2. Step: Compute — Calculate 7-day, 14-day, 30-day windows and aggregate measurement values
  3. Step: Classify — Apply nominal classification logic against configuration.objective_spec thresholds
  4. Step: Write — Append/overwrite into measurement.sales and measurement.performance Iceberg tables

Schedule: Hourly via Cron Trigger → Workflow.

From objective_specification thresholds (stored as macrodata_artifact in PG, replicated to R2 for query co-location):

Nominal ClassificationRuleCCO Measurement Scale
top_risersales_first_7_days >= threshold_7d AND sales_first_14_days >= threshold_14dNominal (derived from ratio)
winnersales_last_30_days >= threshold_30dNominal (derived from ratio)
bananaTop N by aggregate_measurement_value (N from banana_count)Nominal (derived from ordinal rank)
archive_candidateis_active = false or business-defined staleness rulesNominal
noneDefault — does not meet any classification criteriaNominal

Watermarks live in Durable Object persistent storage — not PG tables. Each Workflow reads the watermark from its associated Durable Object:

{
"key": "orders_last_watermark",
"value_json": { "ts": "2026-02-19T06:00:00Z" }
}
  • Hard floor: 128 days back — watermarks older than this reset to NOW() - 128 days
  • Advanced after successful Workflow step completion only
  • Used as updated_at filter in Shopify bulk queries
  • Enables incremental sync (only new/updated orders since last watermark)
  • Historical loads beyond 128 days use a separate full backfill Workflow

DuckDB queries R2 Iceberg tables via the R2 Data Catalog REST API. Three deployment options:

OptionUse caseConstraints
WASM in Workers (Ducklings)Dashboard API inline queries, low-latency reads128 MB memory limit
CF ContainersHeavy aggregations, full DuckDB feature setContainer cold start
MotherDuck via HyperdriveShared analytical workspace, Postgres wire protocolExternal dependency

Workers Analytics Engine handles high-frequency counters that don’t need long-term Iceberg retention:

  • Clicks, impressions, page views (20 blobs + 20 doubles per data point)
  • 3-month retention, automatic sampling at high volume
  • Grafana integration for operational dashboards
  • Writes happen in parallel with Pipeline stream pushes — not sequentially
BucketBFO ClassPipeline answer
WHATProcessWorker enrichment → Pipeline streaming → Iceberg retention → DuckDB aggregation
WHOMaterial EntityWorkers (EngineeredSystem), Shopify API, CF Pipelines, DuckDB
WHENTemporal RegionWebhooks (real-time), daily cron (bulk sync), hourly cron (measurement refresh)
WHERESiteShopify API, CF Workers, PG (domain), R2 (analytics), Pipeline streams
HOW WE KNOWGDC/ICEJSONL archives on R2, Iceberg telemetry tables, sync telemetry audit trail
HOW IT ISQualityWatermark position (DO), Workflow step status, stream throughput
WHYRole/DispositionWebhook Function, Scheduled Function (cron), data currency Disposition