Skip to content

Daily Shopify Bulk Sync

Staff-facing dashboards require fresh sales metrics derived from order headers and order line items. The integration is loosely coupled and asynchronous, implemented via Cron + CF Workflow steps.

Goal: Execute a multi-step CF Workflow that upserts order_header / order_line_item → refreshes product → writes measurement.sales and measurement.performance to R2 for performance tracking and analytics.

MethodPathSpecDescription
POST/api/performance-metrics/syncanalytics.yamlManual trigger (starts same CF Workflow)
GET/api/ingest/runsanalytics.yamlList recent sync telemetry records with status
GET/ordersorder.yamlList orders (staff, paginated)
GET/orders/{order_id}order.yamlSingle order detail
GET/analytics/commerce/overviewanalytics.yamlCommerce summary KPIs for date range
GET/analytics/pipelinesanalytics.yamlPipeline health (SYNC_TELEMETRY stream)
TableModuleRole
order_headerOrderWrite: upserted from Shopify bulk data
order_line_itemOrderWrite: upserted from Shopify bulk data
productProductWrite: refreshed after order upsert
ResourceTypePurpose
CF Cron TriggerCronDaily 6am UTC trigger
ingestWorker (CF Workflow)Orchestrates multi-step sync via Durable Objects
SYNC_TELEMETRYPipelineExecution audit trail to R2
COMMERCE_TELEMETRYPipelineOrder/fulfillment event streaming
R2 bucketObject storageJSONL archives, measurement data, telemetry
Durable ObjectState storeWatermark tracking
ActorRole
CF Cron TriggerFires daily at 6am UTC — scheduled trigger
CF WorkflowOrchestrates multi-step sync via Durable Objects
Shopify Admin APISource of order data
PlanetScaleDatabase for order records, metrics
R2Object storage for JSONL archives
  • Shopify Admin API access token configured as a Wrangler secret
  • PlanetScale reachable via Hyperdrive
  • Durable Object contains a valid watermark (hard floor: 128 days back)
  1. Step: Initiate — Cron fires → triggers CF Workflow run
  2. Workflow records telemetry to sync.telemetry on R2 (run_id, process_initiated_at, step=initiated)
  3. Step: Request — Workflow initiates Shopify bulkOperationRunQuery with orders/line-items query
  4. Step: Poll — Workflow polls Shopify with bulk_operation_id every 16 seconds, up to 128 attempts (~34 min max), until COMPLETED or FAILED
  5. Step: Download — On COMPLETED, Workflow downloads JSONL results and stores in R2 (stored file for audit/replay)
  6. Step: Parse + Upsert — Workflow parses JSONL in chunks and upserts into order_header and order_line_item
  7. Step: Transform — Workflow refreshes product, writes measurement.sales and measurement.performance to R2
  8. Step: Finalize — Workflow updates watermark in Durable Object and writes completion to sync.telemetry on R2
sequenceDiagram
  participant Cron as CF Cron
  participant WF as CF Workflow
  participant Shopify as Shopify API
  participant DB as PlanetScale
  participant R2 as R2

  Cron->>WF: trigger workflow run
  WF->>R2: write sync.telemetry(initiated)
  WF->>Shopify: bulkOperationRunQuery
  WF->>Shopify: poll bulk status (16s interval)
  Note over WF,Shopify: retry poll if pending (max 128 attempts)
  alt COMPLETED
    WF->>Shopify: download JSONL
    WF->>R2: put(JSONL archive)
    WF->>DB: upsert order_header (chunked)
    WF->>DB: upsert order_line_item (chunked)
    WF->>DB: refresh product
    WF->>R2: write measurement.sales + measurement.performance
    WF->>R2: write sync.telemetry(completed) + update DO watermark
  else FAILED/RATE-LIMITED
    WF->>WF: retry with backoff
  end
Error conditionRecovery
Bulk op already runningDelay retry — only one concurrent bulk query on API version 2024-04
Shopify rate limit / transient failureQueue retry + delay (up to 12 hours)
PlanetScale row limit exceededReduce batch size, resume from last committed chunk
JSONL parse errorMark execution as failed, store error in sync.telemetry on R2

Workflow trigger payload

{
"run_id": "uuid",
"initiator": "cron|user",
"requested_at": "ISO8601"
}

Daily incremental CF Workflow completes in < 15 minutes (async). Poll budget: 128 attempts x 16s = ~34 min hard ceiling. Chunk DB writes to avoid 20s transaction timeout.

  • PASS: sync.telemetry on R2 shows completed execution; measurement.performance on R2 updated; sample products show correct 7/30-day sales metrics
  • FAIL: Execution ends in failed; watermark regressions; duplicate measurement.sales entries after re-run

The incremental sync uses the watermark in the Durable Object to query only orders modified since the last successful run.

  • Hard floor: 128 days. If the watermark is older than 128 days (or missing), it resets to NOW() - 128 days. This bounds the maximum data volume per incremental run and prevents unbounded Shopify bulk queries.
  • Advance on success only: the watermark advances to MAX(updated_at) from the fetched batch only after all Steps complete. A failed run leaves the watermark unchanged — the next run retries from the same position.

Historical data loads beyond the 128-day window use a separate Durable Object workflow rather than the daily sync path. The backfill workflow:

  • Accepts a custom date range (no floor constraint)
  • Runs as its own workflow execution with a distinct job_type (FULL_BACKFILL_EXECUTE)
  • Uses the same parse/upsert/transform Steps but with larger chunk budgets
  • Does not advance the daily watermark — the two are independent
  • Can run concurrently with daily sync since Shopify allows one bulk op per app, but the backfill workflow handles the “bulk op already running” error with longer retry delays
BucketAnswer
WHATBulk data acquisition workflow (order data from Shopify)
WHOCF Workflow, Shopify API, Cron Trigger
WHENDaily 6am UTC (cron); on-demand via POST /api/performance-metrics/sync
WHERECF Workflow / Durable Objects, R2 bucket, PlanetScale tables
HOW WE KNOWsync.telemetry on R2, JSONL archives (R2 objects), watermark in Durable Object
HOW IT ISExecution status: initiated → polling → downloading → upserting → completed/failed
WHYKeep dashboard data fresh; scheduled data currency