Skip to main content

D1-Shopify同期システム:不整合ゼロアーキテクチャ

🎯 設計原則

同期システムの基本方針

const syncPrinciples = {
singleSource: {
principle: "Shopify = 単一真実源",
implementation: "全ての状態変更はShopifyから開始",
guarantee: "Shopifyの状態が常に正しい"
},

eventDriven: {
principle: "イベントドリブン同期",
implementation: "Webhook + ポーリングのハイブリッド",
reliability: "失敗時の自動リトライ + 手動修復"
},

consistency: {
principle: "結果整合性",
implementation: "5分以内の最終的整合性保証",
monitoring: "不整合検出アラート"
},

performance: {
principle: "40req/min制限完全遵守",
implementation: "インテリジェントレート制御",
optimization: "バッチ処理 + キャッシュ活用"
}
}

🔄 同期アーキテクチャ

データフロー全体像

graph TD
A[🔥 Firebase Auth] --> B[⚡ Workers BFF]
B --> C[🏪 Shopify Admin API]
C --> D[📦 Shopify Metafields]

E[🔔 Shopify Webhooks] --> B
B --> F[🗄️ D1 Cache]

G[⏰ Cron Jobs] --> B
B --> H[🔍 Consistency Check]

I[🖼️ R2 Storage] --> B

style C fill:#95f985
style F fill:#85c9f5
style E fill:#f985d3

1. リアルタイム同期(Webhook)

// 🔔 Webhook受信システム
interface WebhookHandler {

// サポートするWebhookイベント
supportedEvents: [
'orders/create', // 新規注文作成
'orders/updated', // 注文状態変更
'orders/paid', // 決済完了
'orders/fulfilled', // 発送完了
'orders/cancelled', // 注文キャンセル
'customers/create', // 新規顧客登録
'customers/update' // 顧客情報更新
],

// Webhook処理フロー
processWebhook: async (event: ShopifyWebhook) => {
try {
// 1. 署名検証
const isValid = await verifyShopifySignature(event)
if (!isValid) throw new Error('Invalid webhook signature')

// 2. 重複処理防止
const isDuplicate = await checkDuplicateEvent(event.id)
if (isDuplicate) return { status: 'already_processed' }

// 3. イベント種別に応じた処理
switch (event.topic) {
case 'orders/create':
await handleOrderCreate(event.data)
break
case 'orders/updated':
await handleOrderUpdate(event.data)
break
// ... 他のイベント
}

// 4. 処理完了記録
await recordWebhookProcessed(event.id)

return { status: 'success', processed_at: new Date().toISOString() }

} catch (error) {
// エラー時は後でリトライ
await scheduleRetry(event, error)
throw error
}
}
}

// 📦 注文作成時の同期処理
async function handleOrderCreate(order: ShopifyOrder): Promise<void> {
// メタフィールド抽出
const metadata = extractMetafieldsFromOrder(order)

// D1キャッシュ更新
await updateD1OrderCache({
shopify_order_id: order.id,
customer_id: order.customer?.id,
firebase_uid: metadata.customer?.firebase_uid,
brand: metadata.order?.brand_context?.brand,
status: 'created',
line_items: order.line_items.map(item => ({
id: item.id,
photo_file: metadata.line_items[item.id]?.photo_file,
print_details: metadata.line_items[item.id]?.print_details
})),
created_at: order.created_at,
updated_at: new Date().toISOString()
})

// 処理キューへ追加
await addToProcessingQueue({
order_id: order.id,
priority: determinePriority(metadata.order?.brand_context?.brand),
estimated_completion: calculateEstimatedCompletion(order.line_items.length)
})
}

2. ポーリング同期(フォールバック)

// ⏰ 定期同期システム(Webhook失敗時の保険)
interface PollingSync {

schedule: "*/5 * * * *", // 5分毎実行

// アクティブオーダーの状態同期
syncActiveOrders: async () => {
const activeOrders = await getActiveOrdersFromD1()

for (const cachedOrder of activeOrders) {
try {
// Shopifyから最新状態取得
const shopifyOrder = await fetchShopifyOrder(cachedOrder.shopify_order_id)

// 状態比較
const hasChanged = compareOrderState(cachedOrder, shopifyOrder)

if (hasChanged) {
// D1キャッシュ更新
await updateD1OrderCache(shopifyOrder)

// 差分ログ記録
await logStateChange({
order_id: cachedOrder.shopify_order_id,
old_state: cachedOrder,
new_state: shopifyOrder,
sync_method: 'polling'
})
}

} catch (error) {
await logSyncError(cachedOrder.shopify_order_id, error)
}
}
},

// Firebase UID ↔ Shopify Customer ID マッピング同期
syncCustomerMapping: async () => {
const recentCustomers = await getRecentCustomersFromD1()

for (const customer of recentCustomers) {
const shopifyCustomer = await fetchShopifyCustomer(customer.shopify_id)
const firebaseUID = shopifyCustomer.metafields?.find(
m => m.namespace === 'nekomata' && m.key === 'firebase_uid'
)?.value

if (firebaseUID && firebaseUID !== customer.firebase_uid) {
await updateCustomerMapping(customer.shopify_id, firebaseUID)
}
}
}
}

3. 不整合検出・修復システム

// 🔍 データ整合性監視システム
interface ConsistencyChecker {

// 不整合パターン定義
inconsistencyPatterns: [
'missing_order_in_d1', // ShopifyにあるのにD1にない注文
'missing_order_in_shopify', // D1にあるのにShopifyにない注文
'status_mismatch', // 注文状態の不一致
'metafield_mismatch', // メタフィールド値の不一致
'photo_file_missing', // 写真ファイル参照の不整合
'customer_mapping_error' // Firebase UID マッピングエラー
],

// 整合性チェック実行
runConsistencyCheck: async (): Promise<InconsistencyReport> => {
const report: InconsistencyReport = {
timestamp: new Date().toISOString(),
inconsistencies: [],
auto_fixed: 0,
manual_review_required: 0
}

// パターン1: 欠落した注文の検出
const missingOrders = await detectMissingOrders()
for (const missing of missingOrders) {
if (missing.age_minutes < 60) {
// 1時間以内 → 自動修復
await autoFixMissingOrder(missing)
report.auto_fixed++
} else {
// 古い不整合 → 手動レビュー
report.inconsistencies.push(missing)
report.manual_review_required++
}
}

// パターン2: 状態不一致の検出
const statusMismatches = await detectStatusMismatches()
for (const mismatch of statusMismatches) {
// Shopifyを正として修復
await fixStatusMismatch(mismatch)
report.auto_fixed++
}

return report
},

// 自動修復ロジック
autoFixMissingOrder: async (missing: MissingOrder) => {
if (missing.location === 'shopify_only') {
// ShopifyにあるのにD1にない → D1に追加
const shopifyOrder = await fetchShopifyOrder(missing.order_id)
await createD1OrderCache(shopifyOrder)

} else if (missing.location === 'd1_only') {
// D1にあるのにShopifyにない → 削除または調査フラグ
const d1Order = await getD1Order(missing.order_id)
if (d1Order.age_minutes > 1440) { // 24時間以上古い
await archiveD1Order(missing.order_id)
} else {
await flagForManualReview(missing.order_id)
}
}
}
}

🚨 エラーハンドリング・リトライ戦略

段階的リトライシステム

// 🔄 インテリジェントリトライ
interface RetryStrategy {

// リトライパターン定義
patterns: {
webhook_timeout: {
max_attempts: 5,
backoff: 'exponential', // 1s, 2s, 4s, 8s, 16s
fallback: 'schedule_polling_sync'
},

rate_limit_hit: {
max_attempts: 10,
backoff: 'rate_aware', // API制限を考慮した待機
fallback: 'batch_queue'
},

network_error: {
max_attempts: 3,
backoff: 'linear', // 5s, 10s, 15s
fallback: 'dead_letter_queue'
}
},

// リトライ実行
executeWithRetry: async <T>(
operation: () => Promise<T>,
errorType: keyof RetryStrategy['patterns']
): Promise<T> => {
const pattern = this.patterns[errorType]
let lastError: Error

for (let attempt = 1; attempt <= pattern.max_attempts; attempt++) {
try {
return await operation()
} catch (error) {
lastError = error

if (attempt < pattern.max_attempts) {
const delay = calculateBackoffDelay(pattern.backoff, attempt)
await sleep(delay)
continue
}
}
}

// 全リトライ失敗 → フォールバック戦略
await executeFallbackStrategy(pattern.fallback, lastError)
throw lastError
}
}

40req/min制限管理

// ⚡ レート制限インテリジェント管理
interface RateLimitManager {

// 制限状況リアルタイム追跡
currentLimits: {
used: number, // 現在の使用量
remaining: number, // 残り回数
reset_time: Date, // リセット時刻
bucket_size: 40 // バケットサイズ
},

// API呼び出し前チェック
checkAndReserve: async (requestCount: number = 1): Promise<boolean> => {
if (this.currentLimits.remaining < requestCount) {
const waitTime = this.currentLimits.reset_time.getTime() - Date.now()
if (waitTime > 0) {
// 制限リセット待ち
await sleep(waitTime + 1000) // 1秒余裕を持つ
}
}

// 使用量更新
this.currentLimits.used += requestCount
this.currentLimits.remaining -= requestCount

return true
},

// バッチ処理最適化
optimizeBatchRequests: async (requests: APIRequest[]): Promise<APIRequest[][]> => {
const batches: APIRequest[][] = []
let currentBatch: APIRequest[] = []

for (const request of requests) {
if (currentBatch.length < this.currentLimits.remaining) {
currentBatch.push(request)
} else {
batches.push(currentBatch)
currentBatch = [request]
}
}

if (currentBatch.length > 0) {
batches.push(currentBatch)
}

return batches
}
}

📊 監視・アラートシステム

パフォーマンス監視

// 📈 同期パフォーマンス監視
interface SyncMonitoring {

// KPI定義
kpis: {
sync_latency: "Webhook→D1更新の平均時間",
consistency_rate: "データ整合性率",
error_rate: "同期エラー発生率",
api_utilization: "Shopify API使用率"
},

// アラート条件
alertThresholds: {
sync_latency: { warning: 5000, critical: 15000 }, // ms
consistency_rate: { warning: 0.99, critical: 0.95 }, // %
error_rate: { warning: 0.01, critical: 0.05 }, // %
api_utilization: { warning: 0.8, critical: 0.95 } // %
},

// リアルタイムダッシュボード
dashboard: {
metrics: [
"現在の同期待ちキュー長",
"過去24時間の同期成功/失敗数",
"API制限使用状況",
"不整合検出・修復状況"
],

alerts: [
"同期遅延アラート",
"不整合大量発生アラート",
"API制限近接アラート"
]
}
}

運用自動化

// 🤖 自動運用システム
interface AutomatedOperations {

// 自動スケーリング
autoScaling: {
trigger: "キュー長 > 100",
action: "並列処理ワーカー増加",
max_workers: 10
},

// 自動修復
autoHealing: {
scenario: "連続失敗検出",
actions: [
"失敗したWebhookの再処理",
"ポーリング同期の頻度一時増加",
"緊急時手動同期トリガー"
]
},

// プロアクティブメンテナンス
maintenance: {
schedule: "毎日AM3:00",
tasks: [
"古いログデータのアーカイブ",
"D1キャッシュの最適化",
"不整合データの完全スキャン"
]
}
}

🚀 実装ロードマップ

Phase 1: 基本同期システム(2週間)

基本機能:
- Webhook受信・検証システム
- 基本的なD1キャッシュ更新
- 重複処理防止メカニズム
- シンプルなエラーログ

実装優先度: Critical
期待効果: "基本的な同期動作の確立"

Phase 2: 高信頼性システム(1ヶ月)

信頼性強化:
- リトライ戦略の実装
- ポーリング同期のフォールバック
- 不整合検出・自動修復
- API制限インテリジェント管理

実装優先度: High
期待効果: "99.9%の同期成功率達成"

Phase 3: エンタープライズ運用(2ヶ月)

運用自動化:
- 高度な監視・アラートシステム
- 自動スケーリング・自動修復
- パフォーマンス最適化
- 運用ダッシュボード

実装優先度: Medium
期待効果: "完全自動運用の実現"

🎯 成功指標

技術的KPI

const technicalKPIs = {
availability: "99.9%以上のシステム稼働率",
consistency: "99.95%以上のデータ整合性",
latency: "平均5秒以内のWebhook→D1同期",
efficiency: "API制限の80%以下での安定運用"
}

ビジネスKPI

const businessKPIs = {
customerExperience: "マイページ表示3秒以内",
dataAccuracy: "注文・写真データ100%正確性",
systemReliability: "同期エラーによるCS問い合わせゼロ"
}

最終更新: 2025-08-23 18:14:53 JST
設計ステータス: 不整合ゼロアーキテクチャ設計完了
実装準備: Phase 1から段階的実装可能