🔄 Queue System設計 - Shopify API制限対応
📋 AI SUMMARY
System: Cloudflare Queues + Workers Event Processing Purpose: Handle Shopify API rate limits (40 req/min REST, 1000 cost/min GraphQL) with asynchronous processing Key Rules: Idempotent processing, PII-free messages, automatic retry with DLQ, order-based serialization Architecture: Producer-Consumer pattern with rate limiting and failure recovery
🎯 QUICK REFERENCE
- 目的: Shopify API制限 (40req/min) 回避 + UX向上
- パターン: Producer (BFF) → Queue → Consumer (Worker)
- 重要原則: Idempotency Key必須、PII排除、順序制御
- 監視指標: DLQ件数、処理遅延、429エラー率
🔗 INTEGRATION POINTS
- Input Systems: BFF API、フロントエンド操作
- Output Systems: Shopify API、D1更新、メール送信
- Dependencies: Cloudflare Queues、Workers、Shopify webhooks
- Validation: Idempotency確認、重複排除、順序保証
# Queue System ルール(機械可読版)
QUEUE_DESIGN_RULES:
message_format:
- "PII完全排除: IDのみ参照"
- "Idempotency Key必須: brand:op:order_id:version"
- "小粒度イベント: ORDER_CONFIRMED, FULFILLMENT_REQUESTED等"
rate_limiting:
- "Shopify REST: 40req/min (制限値) → 実運用35req/min (余裕5req)"
- "Shopify GraphQL: 1000コスト/min (制限値) → 実運用900コスト/min (余裕100)"
- "Consumer同時実行数: 最大3"
retry_strategy:
- "指数バックオフ: 1, 2, 4, 8, 16秒"
- "最大試行回数: 5回"
- "429/5xx: 自動リトライ"
- "4xx: DLQ直行"
idempotency:
- "処理済みキー表でD1管理"
- "重複検出で早期Return"
- "Shopify側もIdempotency Key使用"
🔄 どこでキューを使うか?
🎯 Phase 1: 最重要キューポイント(必須実装)
1. 注文確定処理
// ❌ 直同期(問題あり)
POST /api/orders/create
→ Shopify Order作成 (40req/min制限)
→ D1更新
→ メール送信
→ レスポンス (遅い + API制限リスク)
// ✅ Queue活用(推奨)
POST /api/orders/create
→ Queue.enqueue("ORDER_CONFIRMED", {order_id, brand})
→ 即座にレスポンス (早い)
Consumer Worker:
→ Shopify Order作成
→ D1更新
→ メール送信Queue投入
2. Shopify → D1 データ同期
// Shopify Webhook受信
POST /webhooks/orders/paid
→ Queue.enqueue("ORDER_PAYMENT_CONFIRMED", {shopify_order_id})
// 定期同期
Scheduled Worker (5分毎)
→ Queue.enqueue("SYNC_PRODUCTS", {brand: "neko", last_cursor})
→ Queue.enqueue("SYNC_ORDERS", {brand: "tokinoe", last_cursor})
🎯 Phase 2: UX改善キューポイント(推奨実装)
3. 大量注文・一括処理
// セール時のバルク注文処理
events = [
{type: "ORDER_CONFIRMED", order_id: "123", brand: "neko"},
{type: "ORDER_CONFIRMED", order_id: "124", brand: "neko"},
// ... 100件
]
// 各イベントを順次キューイング(API制限安全)
for (event of events) {
queue.enqueue(event, {shard_key: event.order_id})
}
4. 画像処理
// 画像アップロード
POST /api/photos/upload
→ R2直接アップロード
→ Queue.enqueue("PHOTO_PROCESS", {photo_id, resize_specs})
→ 即座にレスポンス
Consumer Worker:
→ 画像リサイズ (R2 Image Resizing)
→ サムネイル生成
→ メタデータ更新
5. メール送信
// メール関連は全てキューイング
queue.enqueue("EMAIL_ORDER_CONFIRMED", {order_id})
queue.enqueue("EMAIL_SHIPPING_NOTIFICATION", {order_id})
queue.enqueue("EMAIL_DELIVERY_COMPLETE", {order_id})
// バッチ送信でShopify負荷軽減
Consumer Worker:
→ 10件まとめて処理
→ テンプレート一括生成
6. 工場システム連携
// 工場アプリからの状況更新
POST /api/factory/status-update
→ Queue.enqueue("PRINT_STATUS_CHANGED", {
print_job_id,
status: "printing|completed|shipped"
})
Consumer Worker:
→ Shopify フルフィルメント更新
→ 顧客メール送信
→ 在庫調整
🏗️ 具体的なQueue構成
Queue Type別設計
1. Order Processing Queue
interface OrderEvent {
type: "ORDER_CONFIRMED" | "ORDER_CANCELLED" | "ORDER_FULFILLED"
order_id: string
brand: "neko" | "tokinoe" | "dog"
idempotency_key: string // brand:op:order_id:timestamp
metadata?: {
user_id?: string
shopify_order_id?: string
}
}
2. Sync Queue
interface SyncEvent {
type: "SYNC_PRODUCTS" | "SYNC_ORDERS" | "SYNC_INVENTORY"
brand: string
resource_type: "products" | "orders" | "customers"
cursor?: string // ページネーション
batch_size: number
}
3. Email Queue
interface EmailEvent {
type: "EMAIL_ORDER_CONFIRMED" | "EMAIL_SHIPPED" | "EMAIL_SUPPORT"
template_id: string
recipient_shopify_id: string // PII参照のみ
brand: string
context: Record<string, any> // 非PII変数のみ
}
⚙️ 実装パターン
Producer (BFF API)
// /api/orders/create
export default async function handler(request: Request) {
const { photo_ids, brand } = await request.json()
// 1. 即座に受付
const order_id = generateOrderId()
// 2. Queue投入
await env.ORDER_QUEUE.send({
type: "ORDER_CONFIRMED",
order_id,
brand,
idempotency_key: `${brand}:create:${order_id}:${Date.now()}`,
metadata: { photo_ids }
})
// 3. 即座にレスポンス
return Response.json({
order_id,
status: "processing",
message: "注文を受け付けました。処理中です..."
})
}
Consumer (Queue Worker)
export default {
async queue(batch, env) {
for (const message of batch.messages) {
try {
await processOrderEvent(message.body, env)
message.ack()
} catch (error) {
console.error('処理失敗:', error)
message.retry() // 自動リトライ
}
}
}
}
async function processOrderEvent(event: OrderEvent, env) {
// 1. Idempotency確認
const existing = await env.D1.prepare(
'SELECT id FROM processed_events WHERE idempotency_key = ?'
).bind(event.idempotency_key).first()
if (existing) {
console.log('既に処理済み:', event.idempotency_key)
return
}
// 2. Shopify API呼び出し
const shopifyOrder = await createShopifyOrder(event)
// 3. D1更新
await env.D1.prepare(
'INSERT INTO orders (id, shopify_order_id, brand, status) VALUES (?, ?, ?, ?)'
).bind(event.order_id, shopifyOrder.id, event.brand, 'confirmed').run()
// 4. 処理済み記録
await env.D1.prepare(
'INSERT INTO processed_events (idempotency_key, event_type, processed_at) VALUES (?, ?, ?)'
).bind(event.idempotency_key, event.type, new Date().toISOString()).run()
}
📊 監視・運用設計
必須メトリクス
queue_metrics:
enqueue_rate: "件/分"
processing_rate: "件/分"
success_rate: "%"
retry_count: "回数"
dlq_count: "件数"
latency_p95: "秒"
shopify_api_metrics:
rate_limit_usage: "%" # 40req/min の使用率
error_4xx_rate: "%"
error_5xx_rate: "%"
response_time_p95: "ms"
アラート設定
alerts:
dlq_increase:
threshold: "10件/時間"
action: "Slack通知 + PagerDuty"
rate_limit_exceeded:
threshold: "429エラー > 5件/分"
action: "即座にSlack通知"
processing_delay:
threshold: "p95遅延 > 2分"
action: "調査アラート"
🎯 導入優先順位
Week 1: 基盤構築
- Order Processing Queue実装
- Idempotency機能実装
- 基本的なConsumer作成
Week 2: 安定化
- リトライ・DLQ実装
- メトリクス・ダッシュボード
- E2Eテスト実装
Week 3: 拡張
- Sync Queue実装
- Email Queue実装
- 工場連携Queue検討
Queue System設計完成: ✅ 実装準備完了
API制限対応: ✅ 40req/min安全運用
UX向上: ✅ 即座レスポンス実現
障害耐性: ✅ DLQ + 自動リトライ