Skip to content

Pipeline

The data pipeline follows the warehouse pattern raw → core → mart, replacing Supabase Edge Functions and materialized views with CF Workers + Queues + Cron. Each stage is a ProcedureExecution (PKO) with ordered Steps, connected by the BFO precedes relation.

[BFO: Process chain linked by precedes | PKO: ProcedureExecution with Steps | IAO: IBEs → typed Measurement Data]

Shopify Admin API (GraphQL Bulk)
┌────────────────────┐
│ raw layer │ commercial_transaction_record, transaction_participation_record
│ (IBE append + │ procedure_execution_record, procedure_execution_state
│ deduplicate) │
└────────┬───────────┘
│ precedes
┌────────────────────┐
│ core layer │ material_artifact, sales_measurement_dataset
│ (type + measure) │ IBEs → typed Continuants + Measurement Data
└────────┬───────────┘
│ precedes
┌────────────────────┐
│ mart layer │ performance_measurement_dataset
│ (aggregate + │ Objective Specifications → Nominal Classifications
│ classify) │
└────────────────────┘
Dashboard API → React Frontend

[PKO: ProcedureExecution | Steps: initiate → poll → download → parse → upsert → advance watermark]

Triggered by the Daily Shopify Bulk Sync scenario. The Ingest Worker (EngineeredSystem, IOF) executes a Procedure with six ordered Steps:

  1. Step: Initiate — Cron fires → enqueues PROCEDURE_EXECUTION_INITIATE. Ingest Worker creates a procedure_execution_record row with step = 'initiated'
  2. Step: Request — Initiates Shopify bulkOperationRunQuery (GraphQL). Updates step to 'requesting'
  3. Step: Poll — Self-enqueues PROCEDURE_EXECUTION_POLL messages until bulk operation completes. Updates step to 'polling'
  4. Step: Download — Downloads JSONL results, archives to R2 (Information Bearing Entity) for audit/replay. Updates step to 'downloading'
  5. Step: Parse + Upsert — Parses JSONL and upserts into commercial_transaction_record and transaction_participation_record in chunked batches. Updates step to 'upserting'
  6. Step: Advance Watermark — Updates procedure_execution_state with new cursor position. Marks execution 'completed'

[BFO: Transformation Process | IAO: IBEs → typed Measurement Data and MaterialArtifacts]

After raw data lands (the raw ProcedureExecution precedes this stage):

  1. material_artifact — Upsert product metadata (title, status, vendor, tags) from transaction participation records + tag sync. Each row is an Independent Continuant (BFO) / MaterialArtifact (IOF) — a physical product with typed Qualities.

  2. sales_measurement_dataset — Aggregate participation record quantities by (artifact_identifier, day). Each row is a Measurement Datum (IAO) on a ratio scale, anchored to a Temporal Region (calendar day).

INSERT INTO sales_measurement_dataset (artifact_identifier, day, units, units_net)
SELECT artifact_identifier, DATE(order_initiated_at), SUM(quantity), SUM(quantity)
FROM transaction_participation_record
WHERE order_initiated_at >= ? -- watermark from procedure_execution_state
GROUP BY artifact_identifier, DATE(order_initiated_at)
ON DUPLICATE KEY UPDATE
units = VALUES(units),
units_net = VALUES(units_net);

[BFO: Aggregation Process | CCO: Nominal Measurement derived from IAO Objective Specification]

Replaces PostgreSQL REFRESH MATERIALIZED VIEW with explicit table writes. This is where REA Typification happens: the Objective Specification (policy/type layer) governs Nominal Classification of individual MaterialArtifacts (instance layer).

Nominal classification logic (from objective_specification thresholds):

Nominal ClassificationRuleCCO Measurement Scale
top_risersales_first_7_days >= threshold_7d AND sales_first_14_days >= threshold_14dNominal (derived from ratio)
winnersales_last_30_days >= threshold_30dNominal (derived from ratio)
bananaTop N by aggregate_measurement_value (N from banana_count)Nominal (derived from ordinal rank)
archive_candidateis_active = false or business-defined staleness rulesNominal
noneDefault — does not meet any classification criteriaNominal

Refresh ProcedureExecution:

  1. Step: Query — Read sales_measurement_dataset aggregates joined with material_artifact
  2. Step: Compute — Calculate 7-day, 14-day, 30-day windows and aggregate measurement values
  3. Step: Classify — Apply Nominal Classification logic against current objective_specification
  4. Step: Upsert — Write into performance_measurement_dataset in batches
  5. Step: Finalize — Update last_refreshed timestamp

Schedule: Hourly via Cron Trigger. The hourly frequency balances freshness with PlanetScale resource usage.

[PKO: ExecutionStatus | BFO: precedes — each execution picks up where the previous left off]

procedure_execution_state stores cursor positions to avoid reprocessing. Each ProcedureExecution reads the watermark from the previous execution (BFO precedes relation):

{
"key": "orders_last_watermark",
"value_json": { "ts": "2026-02-19T06:00:00Z" }
}
  • Hard floor: 128 days back — watermarks older than this reset to NOW() - 128 days
  • Advanced after successful ProcedureExecution completion only
  • Used as updated_at filter in Shopify bulk queries
  • Enables incremental sync (only new/updated commercial transactions since last watermark)
  • Historical loads beyond 128 days use a separate full backfill workflow
BucketBFO ClassPipeline answer
WHATProcessThree-stage data acquisition, transformation, and aggregation
WHOMaterial EntityIngest Worker (EngineeredSystem), Shopify API (external EngineeredSystem)
WHENTemporal RegionDaily cron (raw), immediately after raw (core), hourly cron (mart)
WHERESiteShopify API endpoint, CF Queue, PlanetScale tables, R2 bucket
HOW WE KNOWGDC/ICEJSONL archives (IBEs), procedure_execution_record, watermark state
HOW IT ISQualityExecution status, watermark position, record counts, freshness
WHYRole/DispositionScheduled Function (cron), data currency Disposition