Daily Shopify Bulk Sync
Overview
Section titled “Overview”Staff-facing dashboards require fresh Measurement Data derived from commercial transaction records and transaction participation records. The integration is loosely coupled and asynchronous, implemented via Cron + Queue + background ProcedureExecution (PKO).
Goal: Execute a multi-step Procedure that produces updated procedure_execution_record → material_artifact / sales_measurement_dataset → performance_measurement_dataset tables for performance tracking and analytics.
Participants
Section titled “Participants”[CCO: Agents performing Acts | IOF: EngineeredSystems realizing Functions]
| Actor | Ontological type | Role |
|---|---|---|
| CF Cron Trigger | EngineeredSystem (IOF) bearing Function (BFO) | Fires daily at 6am UTC — realizes scheduled Function |
| Ingest Worker | EngineeredSystem (IOF) | Queue consumer, orchestrates ProcedureExecution |
| Shopify Admin API | External EngineeredSystem (IOF) | Source of commercial transaction Process Records |
| PlanetScale | EngineeredSystem (IOF) | IBE store for Process Records, Measurement Data |
| R2 | EngineeredSystem (IOF) | IBE store for JSONL archives |
Preconditions
Section titled “Preconditions”- Shopify Admin API access token configured as a Wrangler secret
- PlanetScale reachable via Hyperdrive
procedure_execution_statecontains a valid watermark (hard floor: 128 days back)
Main success scenario (ProcedureExecution with Steps)
Section titled “Main success scenario (ProcedureExecution with Steps)”[PKO: ProcedureExecution | Steps: initiate → request → poll → download → upsert → finalize]
- Step: Initiate — Cron fires → enqueues
PROCEDURE_EXECUTION_INITIATEmessage - Ingest Worker creates
procedure_execution_recordrow (run_id,process_initiated_at, step=initiated) - Step: Request — Worker initiates Shopify
bulkOperationRunQuerywith orders/line-items query - Step: Poll — Worker enqueues
PROCEDURE_EXECUTION_POLLwithbulk_operation_id. Poll job re-enqueues itself every 16 seconds, up to 128 attempts (~34 min max), untilCOMPLETEDorFAILED - Step: Download — On
COMPLETED, Worker downloads JSONL results and stores in R2 (IBE for audit/replay) - Step: Parse + Upsert — Worker parses JSONL in chunks and upserts into
commercial_transaction_recordandtransaction_participation_record - Step: Transform — Worker refreshes
material_artifact, rebuildssales_measurement_dataset, writes refreshedperformance_measurement_dataset - Step: Finalize — Worker updates watermark in
procedure_execution_stateand marksprocedure_execution_recordascompleted
sequenceDiagram
participant Cron as CF Cron (EngineeredSystem)
participant Q as CF Queue
participant Ingest as Ingest Worker (EngineeredSystem)
participant Shopify as Shopify API (External)
participant DB as PlanetScale (IBE Store)
participant R2 as R2 (IBE Store)
Cron->>Q: enqueue PROCEDURE_EXECUTION_INITIATE
Q->>Ingest: deliver batch
Ingest->>DB: insert procedure_execution_record(initiated)
Ingest->>Shopify: bulkOperationRunQuery
Ingest->>Q: enqueue PROCEDURE_EXECUTION_POLL(attempt=1)
Q->>Ingest: deliver poll job (16s interval)
Ingest->>Shopify: check bulk status
Note over Ingest,Q: re-enqueue poll if pending (max 128 attempts)
alt COMPLETED
Ingest->>Shopify: download JSONL
Ingest->>R2: put(JSONL archive IBE)
Ingest->>DB: upsert commercial_transaction_record (chunked)
Ingest->>DB: upsert transaction_participation_record (chunked)
Ingest->>DB: rebuild material_artifact / sales_measurement_dataset / performance_measurement_dataset
Ingest->>DB: update watermark + finalize execution
else FAILED/RATE-LIMITED (IssueOccurrence)
Ingest->>Q: retry with delaySeconds (FallbackStep)
end
Exception flows (PKO IssueOccurrences + FallbackSteps)
Section titled “Exception flows (PKO IssueOccurrences + FallbackSteps)”| IssueOccurrence | FallbackStep |
|---|---|
| Bulk op already running | Delay retry — only one concurrent bulk query on API version 2024-04 |
| Shopify rate limit / transient failure | Queue retry + delay (up to 12 hours) |
| PlanetScale row limit exceeded | Reduce batch size, resume from last committed chunk |
| JSONL parse error | Mark execution as failed, store error in procedure_execution_record.error_text |
Queue contracts
Section titled “Queue contracts”PROCEDURE_EXECUTION_INITIATE
{ "job_type": "PROCEDURE_EXECUTION_INITIATE", "run_id": "uuid", "initiator": "cron|user", "requested_at": "ISO8601"}PROCEDURE_EXECUTION_POLL
{ "job_type": "PROCEDURE_EXECUTION_POLL", "run_id": "uuid", "bulk_operation_id": "gid://shopify/BulkOperation/...", "attempt": 1, "max_attempts": 128, "interval_seconds": 16}API endpoints
Section titled “API endpoints”POST /api/performance-metrics/sync— manual trigger (enqueues same ProcedureExecution)GET /api/ingest/runs— list recent procedure execution records with status
Performance SLA
Section titled “Performance SLA”Daily incremental ProcedureExecution completes in < 15 minutes (async). Poll budget: 128 attempts × 16s = ~34 min hard ceiling. Chunk DB writes to avoid 20s transaction timeout.
Acceptance criteria
Section titled “Acceptance criteria”- PASS:
procedure_execution_recordshows completed execution;performance_measurement_dataset.last_refreshedupdated; sample MaterialArtifacts show correct 7/30-day sales Measurement Data - FAIL: Execution ends in
failed; watermark regressions; duplicate sales_measurement_dataset after re-run
Watermark strategy
Section titled “Watermark strategy”The incremental sync uses procedure_execution_state.orders_last_watermark to query only orders modified since the last successful run.
- Hard floor: 128 days. If the watermark is older than 128 days (or missing), it resets to
NOW() - 128 days. This bounds the maximum data volume per incremental run and prevents unbounded Shopify bulk queries. - Advance on success only: the watermark advances to
MAX(updated_at)from the fetched batch only after all Steps complete. A failed run leaves the watermark unchanged — the next run retries from the same position.
Full backfill
Section titled “Full backfill”Historical data loads beyond the 128-day window use a separate Durable Object workflow rather than the daily sync path. The backfill workflow:
- Accepts a custom date range (no floor constraint)
- Runs as its own ProcedureExecution with a distinct
job_type(FULL_BACKFILL_EXECUTE) - Uses the same parse/upsert/transform Steps but with larger chunk budgets
- Does not advance the daily watermark — the two are independent
- Can run concurrently with daily sync since Shopify allows one bulk op per app, but the backfill workflow handles the “bulk op already running” IssueOccurrence with longer retry delays
BFO 7 Buckets
Section titled “BFO 7 Buckets”| Bucket | Answer |
|---|---|
| WHAT | Bulk data acquisition ProcedureExecution (commercial transaction records from Shopify) |
| WHO | Ingest Worker (EngineeredSystem), Shopify API (external EngineeredSystem), Cron Trigger |
| WHEN | Daily 6am UTC (Cron Function); on-demand via API |
| WHERE | CF Queue, R2 bucket, PlanetScale tables |
| HOW WE KNOW | procedure_execution_record, JSONL archives (IBEs), watermark state |
| HOW IT IS | ExecutionStatus (initiated → polling → downloading → upserting → completed/failed) |
| WHY | Scheduled Function (cron), data currency Disposition |