Pipeline
Flow overview
Section titled “Flow overview”The data pipeline follows a domain + analytics split: Workers handle PG reads/writes for domain operations, then push enriched events to Cloudflare Pipelines for analytical retention on R2. Pipeline orchestration uses CF Workflows + Durable Objects — no pipeline state tables in PG.
[BFO: Process chain linked by precedes | PKO: ProcedureExecution with Steps | IAO: IBEs → typed Measurement Data]
Source events (Shopify webhooks, platform interactions, cron sync) │ ▼┌────────────────────────────────────┐│ Worker enrichment │ PG reads/writes (domain operations)│ (order_header, product, payment, │ + event denormalization│ publisher, fulfillment tables) │└─────────┬──────────────────────────┘ │ enriched events ▼┌────────────────────────────────────┐│ Cloudflare Pipelines │ SQL transforms per stream│ Three telemetry streams │ → R2 Data Catalog (Iceberg/Parquet)│ (ADR-008) │└─────────┬──────────────────────────┘ │ Iceberg REST catalog ▼┌────────────────────────────────────┐│ DuckDB over R2 │ Sliding windows, aggregations,│ (WASM / Container / MotherDuck) │ nominal classifications└────────────────────────────────────┘
┌────────────────────────────────────┐│ Workers Analytics Engine │ Real-time counters (clicks,│ (parallel, not sequential) │ impressions, page views)└────────────────────────────────────┘See ADR-006 through ADR-009 for the rationale behind this architecture.
Worker enrichment layer
Section titled “Worker enrichment layer”Workers are the active processing layer. They perform domain operations (PG reads/writes) before pushing enriched events to Pipeline streams (ADR-007). Events arrive on R2 already denormalized — analytical queries never need to join back to PG.
Commerce events
Section titled “Commerce events”Triggered by Shopify webhooks or the Daily Shopify Bulk Sync scenario:
- Receive — Webhook or cron trigger arrives at Worker
- Domain write — Upsert into PG domain tables (
order_header,order_line_item,product,payment, etc.) - Enrich — Read back related data (customer, product metadata, fulfillment state) to denormalize
- Stream push — Push enriched event to
COMMERCE_TELEMETRYPipeline
Events: order_created, order_completed, payment_captured, payment_refunded, fulfillment_shipped, return_requested
Publication events
Section titled “Publication events”Triggered by platform interactions (clicks, impressions, views):
- Receive — Platform worker captures visitor interaction
- Stream push — Push to
PUBLICATION_TELEMETRYPipeline (may skip PG entirely for raw engagement events) - Domain write (conditional) — Only writes to PG
publication_telemetryfor events that need domain-level tracking (referrals, conversions, attribution)
High-volume stream — Workers Analytics Engine handles real-time aggregation in parallel.
Sync events
Section titled “Sync events”Emitted by CF Workflows at procedure boundaries:
- Workflow step completes — Record completion metadata (record counts, watermark positions, durations)
- Stream push — Push to
SYNC_TELEMETRYPipeline for long-term audit retention on R2
Telemetry streams (Cloudflare Pipelines)
Section titled “Telemetry streams (Cloudflare Pipelines)”Per ADR-008, three dedicated Pipeline streams with independent SQL transforms:
| Stream | Events | Volume | Iceberg table |
|---|---|---|---|
COMMERCE_TELEMETRY | order_created, order_completed, payment_captured, payment_refunded, fulfillment_shipped, return_requested | Medium (order throughput) | commerce.telemetry |
PUBLICATION_TELEMETRY | click, impression, view, engagement, conversion — enriched with publisher/item/program context | High (every visitor interaction) | publication.telemetry |
SYNC_TELEMETRY | procedure completions with record counts, watermark positions, durations | Low (operational audit trail) | sync.telemetry |
Pipeline SQL transforms are stateless, row-level operations (field extraction, type coercion, timestamp normalization). Complex aggregation happens downstream in DuckDB.
Iceberg tables on R2
Section titled “Iceberg tables on R2”| Iceberg table | Ontological type | What it holds | Partitioning |
|---|---|---|---|
commerce.telemetry | Process Boundary (BFO) | Commerce lifecycle events | Daily |
publication.telemetry | Process Boundary (BFO) | Engagement events | Hourly (high volume) |
sync.telemetry | Process Record (IAO) | Pipeline execution audit trail | Daily |
measurement.sales | Measurement Datum (IAO) | Daily sales aggregates by product x day (ratio scale) | Daily |
measurement.performance | Dataset (IAO) | Aggregated metrics with sliding windows + nominal classification | Daily |
configuration.objective_spec | Objective Specification (IAO) | Classification thresholds (replicated from PG macrodata_artifact) | Unpartitioned |
Pipeline orchestration (CF Workflows)
Section titled “Pipeline orchestration (CF Workflows)”Per ADR-009, all pipeline orchestration uses Cloudflare Workflows — not PG tables.
Shopify Bulk Sync Workflow
Section titled “Shopify Bulk Sync Workflow”The Daily Shopify Bulk Sync is a multi-step Workflow:
- Step: Initiate — Cron fires → Workflow starts. Reads watermark from Durable Object
- Step: Request — Initiates Shopify
bulkOperationRunQuery(GraphQL) - Step: Poll — Workflow sleeps/retries until bulk operation completes
- Step: Download — Downloads JSONL results, archives to R2 for audit/replay
- Step: Parse + Upsert — Parses JSONL and upserts into PG domain tables (
order_header,order_line_item,product) in chunked batches - Step: Enrich + Stream — Pushes enriched events to
COMMERCE_TELEMETRYPipeline - Step: Advance Watermark — Updates watermark in Durable Object storage
Measurement Refresh Workflow
Section titled “Measurement Refresh Workflow”Replaces the old PG-based mart refresh. Runs as a DuckDB query over R2 Iceberg tables:
- Step: Query — DuckDB reads
commerce.telemetryIceberg table, aggregates by product x day - Step: Compute — Calculate 7-day, 14-day, 30-day windows and aggregate measurement values
- Step: Classify — Apply nominal classification logic against
configuration.objective_specthresholds - Step: Write — Append/overwrite into
measurement.salesandmeasurement.performanceIceberg tables
Schedule: Hourly via Cron Trigger → Workflow.
Nominal classification logic
Section titled “Nominal classification logic”From objective_specification thresholds (stored as macrodata_artifact in PG, replicated to R2 for query co-location):
| Nominal Classification | Rule | CCO Measurement Scale |
|---|---|---|
top_riser | sales_first_7_days >= threshold_7d AND sales_first_14_days >= threshold_14d | Nominal (derived from ratio) |
winner | sales_last_30_days >= threshold_30d | Nominal (derived from ratio) |
banana | Top N by aggregate_measurement_value (N from banana_count) | Nominal (derived from ordinal rank) |
archive_candidate | is_active = false or business-defined staleness rules | Nominal |
none | Default — does not meet any classification criteria | Nominal |
Watermark management
Section titled “Watermark management”Watermarks live in Durable Object persistent storage — not PG tables. Each Workflow reads the watermark from its associated Durable Object:
{ "key": "orders_last_watermark", "value_json": { "ts": "2026-02-19T06:00:00Z" }}- Hard floor: 128 days back — watermarks older than this reset to
NOW() - 128 days - Advanced after successful Workflow step completion only
- Used as
updated_atfilter in Shopify bulk queries - Enables incremental sync (only new/updated orders since last watermark)
- Historical loads beyond 128 days use a separate full backfill Workflow
Analytical queries (DuckDB over R2)
Section titled “Analytical queries (DuckDB over R2)”DuckDB queries R2 Iceberg tables via the R2 Data Catalog REST API. Three deployment options:
| Option | Use case | Constraints |
|---|---|---|
| WASM in Workers (Ducklings) | Dashboard API inline queries, low-latency reads | 128 MB memory limit |
| CF Containers | Heavy aggregations, full DuckDB feature set | Container cold start |
| MotherDuck via Hyperdrive | Shared analytical workspace, Postgres wire protocol | External dependency |
Real-time counters
Section titled “Real-time counters”Workers Analytics Engine handles high-frequency counters that don’t need long-term Iceberg retention:
- Clicks, impressions, page views (20 blobs + 20 doubles per data point)
- 3-month retention, automatic sampling at high volume
- Grafana integration for operational dashboards
- Writes happen in parallel with Pipeline stream pushes — not sequentially
Pipeline as BFO 7 Buckets
Section titled “Pipeline as BFO 7 Buckets”| Bucket | BFO Class | Pipeline answer |
|---|---|---|
| WHAT | Process | Worker enrichment → Pipeline streaming → Iceberg retention → DuckDB aggregation |
| WHO | Material Entity | Workers (EngineeredSystem), Shopify API, CF Pipelines, DuckDB |
| WHEN | Temporal Region | Webhooks (real-time), daily cron (bulk sync), hourly cron (measurement refresh) |
| WHERE | Site | Shopify API, CF Workers, PG (domain), R2 (analytics), Pipeline streams |
| HOW WE KNOW | GDC/ICE | JSONL archives on R2, Iceberg telemetry tables, sync telemetry audit trail |
| HOW IT IS | Quality | Watermark position (DO), Workflow step status, stream throughput |
| WHY | Role/Disposition | Webhook Function, Scheduled Function (cron), data currency Disposition |