Pipeline
Flow overview
Section titled “Flow overview”The data pipeline follows the warehouse pattern raw → core → mart, replacing Supabase Edge Functions and materialized views with CF Workers + Queues + Cron. Each stage is a ProcedureExecution (PKO) with ordered Steps, connected by the BFO precedes relation.
[BFO: Process chain linked by precedes | PKO: ProcedureExecution with Steps | IAO: IBEs → typed Measurement Data]
Shopify Admin API (GraphQL Bulk) │ ▼┌────────────────────┐│ raw layer │ commercial_transaction_record, transaction_participation_record│ (IBE append + │ procedure_execution_record, procedure_execution_state│ deduplicate) │└────────┬───────────┘ │ precedes ▼┌────────────────────┐│ core layer │ material_artifact, sales_measurement_dataset│ (type + measure) │ IBEs → typed Continuants + Measurement Data└────────┬───────────┘ │ precedes ▼┌────────────────────┐│ mart layer │ performance_measurement_dataset│ (aggregate + │ Objective Specifications → Nominal Classifications│ classify) │└────────────────────┘ │ ▼ Dashboard API → React FrontendRaw layer: ingest ProcedureExecution
Section titled “Raw layer: ingest ProcedureExecution”[PKO: ProcedureExecution | Steps: initiate → poll → download → parse → upsert → advance watermark]
Triggered by the Daily Shopify Bulk Sync scenario. The Ingest Worker (EngineeredSystem, IOF) executes a Procedure with six ordered Steps:
- Step: Initiate — Cron fires → enqueues
PROCEDURE_EXECUTION_INITIATE. Ingest Worker creates aprocedure_execution_recordrow withstep = 'initiated' - Step: Request — Initiates Shopify
bulkOperationRunQuery(GraphQL). Updates step to'requesting' - Step: Poll — Self-enqueues
PROCEDURE_EXECUTION_POLLmessages until bulk operation completes. Updates step to'polling' - Step: Download — Downloads JSONL results, archives to R2 (Information Bearing Entity) for audit/replay. Updates step to
'downloading' - Step: Parse + Upsert — Parses JSONL and upserts into
commercial_transaction_recordandtransaction_participation_recordin chunked batches. Updates step to'upserting' - Step: Advance Watermark — Updates
procedure_execution_statewith new cursor position. Marks execution'completed'
Core layer: Transformation Process
Section titled “Core layer: Transformation Process”[BFO: Transformation Process | IAO: IBEs → typed Measurement Data and MaterialArtifacts]
After raw data lands (the raw ProcedureExecution precedes this stage):
-
material_artifact— Upsert product metadata (title, status, vendor, tags) from transaction participation records + tag sync. Each row is an Independent Continuant (BFO) / MaterialArtifact (IOF) — a physical product with typed Qualities. -
sales_measurement_dataset— Aggregate participation record quantities by(artifact_identifier, day). Each row is a Measurement Datum (IAO) on a ratio scale, anchored to a Temporal Region (calendar day).
INSERT INTO sales_measurement_dataset (artifact_identifier, day, units, units_net)SELECT artifact_identifier, DATE(order_initiated_at), SUM(quantity), SUM(quantity)FROM transaction_participation_recordWHERE order_initiated_at >= ? -- watermark from procedure_execution_stateGROUP BY artifact_identifier, DATE(order_initiated_at)ON DUPLICATE KEY UPDATE units = VALUES(units), units_net = VALUES(units_net);Mart refresh: Aggregation Process
Section titled “Mart refresh: Aggregation Process”[BFO: Aggregation Process | CCO: Nominal Measurement derived from IAO Objective Specification]
Replaces PostgreSQL REFRESH MATERIALIZED VIEW with explicit table writes. This is where REA Typification happens: the Objective Specification (policy/type layer) governs Nominal Classification of individual MaterialArtifacts (instance layer).
Nominal classification logic (from objective_specification thresholds):
| Nominal Classification | Rule | CCO Measurement Scale |
|---|---|---|
top_riser | sales_first_7_days >= threshold_7d AND sales_first_14_days >= threshold_14d | Nominal (derived from ratio) |
winner | sales_last_30_days >= threshold_30d | Nominal (derived from ratio) |
banana | Top N by aggregate_measurement_value (N from banana_count) | Nominal (derived from ordinal rank) |
archive_candidate | is_active = false or business-defined staleness rules | Nominal |
none | Default — does not meet any classification criteria | Nominal |
Refresh ProcedureExecution:
- Step: Query — Read
sales_measurement_datasetaggregates joined withmaterial_artifact - Step: Compute — Calculate 7-day, 14-day, 30-day windows and aggregate measurement values
- Step: Classify — Apply Nominal Classification logic against current
objective_specification - Step: Upsert — Write into
performance_measurement_datasetin batches - Step: Finalize — Update
last_refreshedtimestamp
Schedule: Hourly via Cron Trigger. The hourly frequency balances freshness with PlanetScale resource usage.
Watermark management
Section titled “Watermark management”[PKO: ExecutionStatus | BFO: precedes — each execution picks up where the previous left off]
procedure_execution_state stores cursor positions to avoid reprocessing. Each ProcedureExecution reads the watermark from the previous execution (BFO precedes relation):
{ "key": "orders_last_watermark", "value_json": { "ts": "2026-02-19T06:00:00Z" }}- Hard floor: 128 days back — watermarks older than this reset to
NOW() - 128 days - Advanced after successful ProcedureExecution completion only
- Used as
updated_atfilter in Shopify bulk queries - Enables incremental sync (only new/updated commercial transactions since last watermark)
- Historical loads beyond 128 days use a separate full backfill workflow
Pipeline as BFO 7 Buckets
Section titled “Pipeline as BFO 7 Buckets”| Bucket | BFO Class | Pipeline answer |
|---|---|---|
| WHAT | Process | Three-stage data acquisition, transformation, and aggregation |
| WHO | Material Entity | Ingest Worker (EngineeredSystem), Shopify API (external EngineeredSystem) |
| WHEN | Temporal Region | Daily cron (raw), immediately after raw (core), hourly cron (mart) |
| WHERE | Site | Shopify API endpoint, CF Queue, PlanetScale tables, R2 bucket |
| HOW WE KNOW | GDC/ICE | JSONL archives (IBEs), procedure_execution_record, watermark state |
| HOW IT IS | Quality | Execution status, watermark position, record counts, freshness |
| WHY | Role/Disposition | Scheduled Function (cron), data currency Disposition |