Skip to content

Daily Shopify Bulk Sync

Staff-facing dashboards require fresh Measurement Data derived from commercial transaction records and transaction participation records. The integration is loosely coupled and asynchronous, implemented via Cron + Queue + background ProcedureExecution (PKO).

Goal: Execute a multi-step Procedure that produces updated procedure_execution_record → material_artifact / sales_measurement_dataset → performance_measurement_dataset tables for performance tracking and analytics.

[CCO: Agents performing Acts | IOF: EngineeredSystems realizing Functions]

ActorOntological typeRole
CF Cron TriggerEngineeredSystem (IOF) bearing Function (BFO)Fires daily at 6am UTC — realizes scheduled Function
Ingest WorkerEngineeredSystem (IOF)Queue consumer, orchestrates ProcedureExecution
Shopify Admin APIExternal EngineeredSystem (IOF)Source of commercial transaction Process Records
PlanetScaleEngineeredSystem (IOF)IBE store for Process Records, Measurement Data
R2EngineeredSystem (IOF)IBE store for JSONL archives
  • Shopify Admin API access token configured as a Wrangler secret
  • PlanetScale reachable via Hyperdrive
  • procedure_execution_state contains a valid watermark (hard floor: 128 days back)

Main success scenario (ProcedureExecution with Steps)

Section titled “Main success scenario (ProcedureExecution with Steps)”

[PKO: ProcedureExecution | Steps: initiate → request → poll → download → upsert → finalize]

  1. Step: Initiate — Cron fires → enqueues PROCEDURE_EXECUTION_INITIATE message
  2. Ingest Worker creates procedure_execution_record row (run_id, process_initiated_at, step=initiated)
  3. Step: Request — Worker initiates Shopify bulkOperationRunQuery with orders/line-items query
  4. Step: Poll — Worker enqueues PROCEDURE_EXECUTION_POLL with bulk_operation_id. Poll job re-enqueues itself every 16 seconds, up to 128 attempts (~34 min max), until COMPLETED or FAILED
  5. Step: Download — On COMPLETED, Worker downloads JSONL results and stores in R2 (IBE for audit/replay)
  6. Step: Parse + Upsert — Worker parses JSONL in chunks and upserts into commercial_transaction_record and transaction_participation_record
  7. Step: Transform — Worker refreshes material_artifact, rebuilds sales_measurement_dataset, writes refreshed performance_measurement_dataset
  8. Step: Finalize — Worker updates watermark in procedure_execution_state and marks procedure_execution_record as completed
sequenceDiagram
  participant Cron as CF Cron (EngineeredSystem)
  participant Q as CF Queue
  participant Ingest as Ingest Worker (EngineeredSystem)
  participant Shopify as Shopify API (External)
  participant DB as PlanetScale (IBE Store)
  participant R2 as R2 (IBE Store)

  Cron->>Q: enqueue PROCEDURE_EXECUTION_INITIATE
  Q->>Ingest: deliver batch
  Ingest->>DB: insert procedure_execution_record(initiated)
  Ingest->>Shopify: bulkOperationRunQuery
  Ingest->>Q: enqueue PROCEDURE_EXECUTION_POLL(attempt=1)
  Q->>Ingest: deliver poll job (16s interval)
  Ingest->>Shopify: check bulk status
  Note over Ingest,Q: re-enqueue poll if pending (max 128 attempts)
  alt COMPLETED
    Ingest->>Shopify: download JSONL
    Ingest->>R2: put(JSONL archive IBE)
    Ingest->>DB: upsert commercial_transaction_record (chunked)
    Ingest->>DB: upsert transaction_participation_record (chunked)
    Ingest->>DB: rebuild material_artifact / sales_measurement_dataset / performance_measurement_dataset
    Ingest->>DB: update watermark + finalize execution
  else FAILED/RATE-LIMITED (IssueOccurrence)
    Ingest->>Q: retry with delaySeconds (FallbackStep)
  end

Exception flows (PKO IssueOccurrences + FallbackSteps)

Section titled “Exception flows (PKO IssueOccurrences + FallbackSteps)”
IssueOccurrenceFallbackStep
Bulk op already runningDelay retry — only one concurrent bulk query on API version 2024-04
Shopify rate limit / transient failureQueue retry + delay (up to 12 hours)
PlanetScale row limit exceededReduce batch size, resume from last committed chunk
JSONL parse errorMark execution as failed, store error in procedure_execution_record.error_text

PROCEDURE_EXECUTION_INITIATE

{
"job_type": "PROCEDURE_EXECUTION_INITIATE",
"run_id": "uuid",
"initiator": "cron|user",
"requested_at": "ISO8601"
}

PROCEDURE_EXECUTION_POLL

{
"job_type": "PROCEDURE_EXECUTION_POLL",
"run_id": "uuid",
"bulk_operation_id": "gid://shopify/BulkOperation/...",
"attempt": 1,
"max_attempts": 128,
"interval_seconds": 16
}
  • POST /api/performance-metrics/sync — manual trigger (enqueues same ProcedureExecution)
  • GET /api/ingest/runs — list recent procedure execution records with status

Daily incremental ProcedureExecution completes in < 15 minutes (async). Poll budget: 128 attempts × 16s = ~34 min hard ceiling. Chunk DB writes to avoid 20s transaction timeout.

  • PASS: procedure_execution_record shows completed execution; performance_measurement_dataset.last_refreshed updated; sample MaterialArtifacts show correct 7/30-day sales Measurement Data
  • FAIL: Execution ends in failed; watermark regressions; duplicate sales_measurement_dataset after re-run

The incremental sync uses procedure_execution_state.orders_last_watermark to query only orders modified since the last successful run.

  • Hard floor: 128 days. If the watermark is older than 128 days (or missing), it resets to NOW() - 128 days. This bounds the maximum data volume per incremental run and prevents unbounded Shopify bulk queries.
  • Advance on success only: the watermark advances to MAX(updated_at) from the fetched batch only after all Steps complete. A failed run leaves the watermark unchanged — the next run retries from the same position.

Historical data loads beyond the 128-day window use a separate Durable Object workflow rather than the daily sync path. The backfill workflow:

  • Accepts a custom date range (no floor constraint)
  • Runs as its own ProcedureExecution with a distinct job_type (FULL_BACKFILL_EXECUTE)
  • Uses the same parse/upsert/transform Steps but with larger chunk budgets
  • Does not advance the daily watermark — the two are independent
  • Can run concurrently with daily sync since Shopify allows one bulk op per app, but the backfill workflow handles the “bulk op already running” IssueOccurrence with longer retry delays
BucketAnswer
WHATBulk data acquisition ProcedureExecution (commercial transaction records from Shopify)
WHOIngest Worker (EngineeredSystem), Shopify API (external EngineeredSystem), Cron Trigger
WHENDaily 6am UTC (Cron Function); on-demand via API
WHERECF Queue, R2 bucket, PlanetScale tables
HOW WE KNOWprocedure_execution_record, JSONL archives (IBEs), watermark state
HOW IT ISExecutionStatus (initiated → polling → downloading → upserting → completed/failed)
WHYScheduled Function (cron), data currency Disposition