メインコンテンツへスキップ
上級32分で読める

イベント駆動アーキテクチャ:非同期処理で実現する柔軟なシステム設計

イベント駆動アーキテクチャ(EDA)の基本概念から実装パターンまでを詳解。イベントソーシング、CQRSとの組み合わせ、実践的な設計指針を学びます。

システム開発イベント駆動アーキテクチャ非同期処理マイクロサービスシステム設計

🎯 この記事で学べること

  • 1
    イベント駆動アーキテクチャの基本概念と利点を理解できます
  • 2
    イベントソーシングとCQRSパターンの実装方法を学べます
  • 3
    非同期処理における課題と解決策を把握できます
  • 4
    実際のシステムでの設計指針と実装パターンを習得できます
  • 5
    メッセージキューとイベントストリーミングの使い分けを理解できます

読了時間: 約5

PayPalの45分間の沈黙

2022年10月、PayPalで奇妙な現象が発生した。

決済は成功する。お金も移動する。しかし、確認メールが届かない。取引履歴に表示されない。まるで決済が「なかったこと」になっていた。

45分間、世界中のユーザーが混乱に陥った。「私のお金はどこに消えた?」

原因は、同期処理の限界だった。

PayPalの古いシステムは、決済が成功すると、その場で10個以上の処理を順番に実行していた。メール送信、履歴更新、統計集計、不正検知、税務記録...。そのうちの一つ、メール送信サービスがダウンした瞬間、すべてが止まった。

この事件を機に、PayPalは大規模なアーキテクチャ改革に踏み切った。それが「イベント駆動アーキテクチャ」への移行だった。

新しいシステムでは、決済が成功したら「PaymentCompleted」というイベントを発行するだけ。メール送信も、履歴更新も、すべてがこのイベントを「聞いて」独立して動く。一つのサービスが死んでも、決済そのものは止まらない。

この決断により、PayPalの決済処理速度は3倍になり、システムの可用性は99.99%を達成した。

イベント駆動アーキテクチャは、現代のシステムが抱える「すべてを同時に、完璧に」という呪縛から解放される鍵なのだ。

LinkedInのメッセージ配送地獄

2012年、LinkedInは深刻な問題に直面していた。

ユーザーAがユーザーBにメッセージを送る。単純な機能のはずだった。しかし実際には、一つのメッセージ送信が引き起こす処理は膨大だった。

受信者への通知、送信者の送信履歴更新、スパムチェック、既読管理、モバイルプッシュ通知、メールダイジェスト更新、アクティビティフィード更新...。

エンジニアのサム・シャーは振り返る。「新しい通知チャンネルを追加するたびに、メッセージ送信のコードを変更していた。6ヶ月後、メッセージ送信の処理は2000行を超え、誰も全体を理解できなくなっていた」

さらに悪いことに、各処理の実行時間はバラバラだった。プッシュ通知は50ミリ秒、メール送信は2秒、スパムチェックは時に10秒以上。最も遅い処理に全体が引きずられ、ユーザーは「送信中...」の表示を眺め続けることになった。

解決策は、すべてを「イベント」として分離することだった。

// Before: 密結合の悪夢
async function sendMessage(message) {
  const savedMessage = await saveToDatabase(message);
  await updateSenderHistory(message.senderId);
  await checkSpam(message.content);
  await sendPushNotification(message.receiverId);
  await sendEmail(message.receiverId);
  await updateActivityFeed(message.receiverId);
  await updateDigest(message.receiverId);
  // ... さらに10個以上の処理
  return savedMessage;
}

// After: イベント駆動の美学
async function sendMessage(message) {
  const savedMessage = await saveToDatabase(message);
  await publishEvent('MessageSent', {
    messageId: savedMessage.id,
    senderId: message.senderId,
    receiverId: message.receiverId,
    content: message.content
  });
  return savedMessage; // 即座にレスポンス
}

// 各サービスが独立してイベントを処理
subscribeToEvent('MessageSent', async (event) => {
  await sendPushNotification(event.receiverId);
});

subscribeToEvent('MessageSent', async (event) => {
  await checkSpam(event.content);
});

結果は劇的だった。メッセージ送信のレスポンス時間は2秒から100ミリ秒に短縮。新機能の追加も、既存コードに触れることなく、新しいイベントリスナーを追加するだけになった。

しかし、これは始まりに過ぎなかった。

イベント駆動の本質は「責任の分離」です。各コンポーネントは自分の仕事だけに集中し、他の処理の完了を待つ必要がありません。

Uber:1秒間に100万イベントの狂気

Uberのシステムは、リアルタイムの塊だ。

ドライバーの位置情報は3秒ごとに更新される。世界中で400万人のドライバーがアクティブな時、それは1秒間に130万回の位置更新を意味する。さらに、乗車リクエスト、マッチング、料金計算、ルート最適化...。

2015年、Uberのモノリシックなシステムは限界に達していた。一つの乗車リクエストが、数十の同期的なAPI呼び出しを引き起こしていた。

元Uberエンジニアのマット・ランティは語る。「最悪だったのは、ドライバーと乗客のマッチングだった。近くのドライバーを検索し、各ドライバーの状態を確認し、最適なドライバーを選び、通知を送る。この間、乗客はずっと待たされる。ピーク時には30秒以上かかることもあった」

Uberが選んだ解決策は、徹底的なイベント駆動化だった。しかし、その道のりは困難を極めた。

第一の試練:イベントの爆発

最初の実装では、すべての状態変化をイベントとして発行した。

# ドライバーの状態変化イベント(初期実装)
events = [
    "DriverLocationUpdated",
    "DriverStatusChanged", 
    "DriverAcceptedRide",
    "DriverStartedRide",
    "DriverCompletedRide",
    "DriverRatedPassenger",
    # ... 50種類以上のイベント
]

1日で生成されるイベント数は100億を超えた。Kafkaクラスターは悲鳴を上げ、ストレージコストは天文学的な数字になった。

第二の試練:イベントの順序

分散システムでは、イベントの順序は保証されない。

ある日、奇妙なバグが報告された。「乗車が完了する前に、料金が請求された」

調査の結果、RideCompletedイベントより先にPaymentProcessedイベントが処理されていたことが判明した。ネットワークの遅延、サービスの処理速度の違い、様々な要因でイベントの順序は入れ替わる。

// 問題:イベントの順序が保証されない
// 実際の順序: StartRide -> CompleteRide -> ProcessPayment
// 処理された順序: ProcessPayment -> StartRide -> CompleteRide

// 解決策:イベントにシーケンス番号を付与
const event = {
  type: "RideCompleted",
  rideId: "ride_123",
  sequence: 1234567890, // タイムスタンプベースのシーケンス
  previousEventId: "evt_previous", // 因果関係の追跡
  data: { ... }
};

第三の試練:重複処理

ネットワークの不安定性により、同じイベントが複数回配信されることがあった。

「なぜ私は同じ乗車に2回請求されたのか?」

顧客からの苦情が殺到した。PaymentServiceが同じRideCompletedイベントを2回受信し、2回請求していたのだ。

解決策は、すべての処理を「冪等」にすることだった。

class PaymentService:
    def handle_ride_completed(self, event):
        # 冪等キーで重複チェック
        idempotency_key = f"payment_{event.ride_id}_{event.id}"
        
        if self.redis.exists(idempotency_key):
            logger.info(f"Event {event.id} already processed")
            return
        
        # 処理を実行
        payment = self.process_payment(event.ride_id)
        
        # 処理済みとしてマーク(24時間保持)
        self.redis.setex(idempotency_key, 86400, "processed")
        
        return payment

これらの試練を乗り越えた後、Uberのシステムは生まれ変わった。マッチング時間は30秒から3秒に短縮。システムの可用性は99.99%を達成。そして何より、新機能の開発速度が10倍になった。

Netflix:カオスエンジニアリングの生みの親

Netflixのイベント駆動システムは、意図的に「壊される」ことで有名だ。

2010年、NetflixはAWSへの移行を進めていた。クラウドの不安定性に対処するため、彼らは逆説的なアプローチを取った。「システムを強くするために、意図的に壊す」

Chaos Monkeyの誕生だ。

しかし、Chaos Monkeyが本当に威力を発揮したのは、Netflixがイベント駆動アーキテクチャを採用してからだった。

従来のシステムでは、一つのサービスが落ちると連鎖的に全体が停止する。しかし、イベント駆動では各サービスが独立している。一つが死んでも、他は動き続ける。

// Netflixのレジリエントなイベント処理
class ResilientEventHandler {
  async handleEvent(event) {
    const circuitBreaker = new CircuitBreaker({
      timeout: 3000,
      errorThreshold: 50,
      resetTimeout: 30000
    });
    
    try {
      // サーキットブレーカーで保護された処理
      await circuitBreaker.fire(async () => {
        await this.processEvent(event);
      });
    } catch (error) {
      if (error.code === 'CIRCUIT_OPEN') {
        // サービスが落ちている場合は後で再試行
        await this.scheduleRetry(event);
      } else {
        // その他のエラーはデッドレターキューへ
        await this.sendToDeadLetter(event, error);
      }
    }
  }
  
  async scheduleRetry(event) {
    // 指数バックオフで再試行
    const delay = Math.pow(2, event.retryCount || 0) * 1000;
    setTimeout(() => {
      this.handleEvent({
        ...event,
        retryCount: (event.retryCount || 0) + 1
      });
    }, delay);
  }
}

Netflixのエンジニア、ブレンダン・グレッグは語る。「イベント駆動にしたことで、私たちは『失敗を前提とした設計』が可能になった。サービスは落ちる。ネットワークは切れる。しかし、ショーは続けなければならない」

実際、2019年のAWS大規模障害の際も、Netflixは影響を最小限に抑えた。イベントがキューに溜まり、復旧後に自動的に処理された。ユーザーが気づいたのは、「いいね」の反映が数分遅れた程度だった。

イベントソーシング:GitHubが歴史を記録する方法

「なぜこのプルリクエストはマージされたのか?」

GitHubでは、すべての操作が「イベント」として記録される。コミット、プルリクエスト、レビュー、マージ。これらは単なるデータの更新ではなく、「出来事」として永続化される。

これが「イベントソーシング」だ。

従来のシステムは「現在の状態」だけを保存する。しかし、イベントソーシングは「すべての変更履歴」を保存する。

// 従来のCRUD
const user = {
  id: 123,
  name: "John Doe",
  email: "john@example.com"
};
// 更新すると前の値は失われる
user.email = "john.doe@example.com";

// イベントソーシング
const events = [
  { 
    type: "UserCreated", 
    timestamp: "2024-01-01T10:00:00Z",
    data: { id: 123, name: "John Doe", email: "john@example.com" }
  },
  {
    type: "EmailChanged",
    timestamp: "2024-06-15T14:30:00Z", 
    data: { id: 123, oldEmail: "john@example.com", newEmail: "john.doe@example.com" }
  }
];

// 任意の時点の状態を再現可能
function getStateAt(timestamp) {
  return events
    .filter(e => e.timestamp <= timestamp)
    .reduce((state, event) => applyEvent(state, event), {});
}

GitHubのシニアエンジニア、ジェシカ・ロードは説明する。「プルリクエストの画面で『2年前のコメント』が表示できるのは、イベントソーシングのおかげです。私たちは歴史を書き換えるのではなく、新しい章を追加していく」

しかし、イベントソーシングには代償もある。

ストレージの爆発

GitHubは1日に1億以上のイベントを生成する。すべてを永久保存すると、ストレージコストは指数関数的に増加する。

解決策は「スナップショット」だった。

// スナップショットによる最適化
class EventStore {
  async getAggregate(id) {
    // 最新のスナップショットを取得
    const snapshot = await this.getLatestSnapshot(id);
    
    let state = snapshot ? snapshot.state : {};
    let fromVersion = snapshot ? snapshot.version : 0;
    
    // スナップショット以降のイベントのみ再生
    const events = await this.getEvents(id, fromVersion);
    
    for (const event of events) {
      state = this.applyEvent(state, event);
    }
    
    // 100イベントごとに新しいスナップショット作成
    if (events.length > 100) {
      await this.createSnapshot(id, state, fromVersion + events.length);
    }
    
    return state;
  }
}

複雑なクエリ

「先月、最も多くレビューされたプルリクエストは?」

イベントストアはこのようなクエリには向いていない。すべてのイベントを読み込んで集計する必要がある。

そこでGitHubは「CQRS(コマンドクエリ責任分離)」を採用した。

# 書き込み側(イベントストア)
class PullRequestCommandHandler:
    def handle_create_pr(self, command):
        events = [
            PullRequestCreated(
                pr_id=command.pr_id,
                author=command.author,
                title=command.title
            )
        ]
        self.event_store.append(events)
        
# 読み取り側(最適化されたビュー)
class PullRequestProjection:
    def on_pull_request_created(self, event):
        # 検索用に最適化されたDBに保存
        self.read_db.pull_requests.insert({
            'id': event.pr_id,
            'author': event.author,
            'title': event.title,
            'created_at': event.timestamp,
            'review_count': 0
        })
    
    def on_review_submitted(self, event):
        # カウンターを更新
        self.read_db.pull_requests.update(
            {'id': event.pr_id},
            {'$inc': {'review_count': 1}}
        )

この設計により、GitHubは過去を完璧に記録しながら、高速なクエリも実現している。

イベントソーシングは強力ですが、すべてのシステムに適しているわけではありません。履歴が重要で、監査要件がある場合に最も価値を発揮します。

Slack:リアルタイムの魔法

Slackを開いた瞬間、魔法が始まる。

誰かがメッセージを送信する。1秒後、世界中の数百万のデバイスに表示される。タイピング中の表示、既読マーク、絵文字リアクション。すべてがリアルタイムだ。

しかし2014年、Slackはスケーラビリティの壁にぶつかっていた。

初期のアーキテクチャは単純だった。WebSocketで接続された各クライアントに、サーバーが直接メッセージを配信する。チームが10人なら問題ない。しかし、10万人のワークスペースでは?

Slackのエンジニア、イーサン・チェンは振り返る。「1つのメッセージが10万のWebSocket接続に配信される。サーバーは溶けた」

解決策は、イベントを「ファンアウト」することだった。

// Slackのメッセージ配信アーキテクチャ(簡略化)
class MessageFanout {
  async handleNewMessage(message) {
    // 1. メッセージをイベントとして発行
    const event = {
      type: 'message.posted',
      channel: message.channel,
      user: message.user,
      text: message.text,
      ts: message.timestamp
    };
    
    // 2. チャンネルメンバーを取得
    const members = await this.getChannelMembers(message.channel);
    
    // 3. メンバーごとにイベントをルーティング
    for (const batch of this.chunk(members, 1000)) {
      await this.publishToQueue('user-events', batch.map(userId => ({
        userId,
        event
      })));
    }
  }
}

// 各接続サーバーは自分のユーザーのイベントのみを処理
class ConnectionServer {
  constructor(serverId) {
    this.connections = new Map(); // userId -> WebSocket
    this.serverId = serverId;
  }
  
  async start() {
    // 自分が担当するユーザーのイベントのみ購読
    await this.subscribe(`server.${this.serverId}.events`, async (event) => {
      const connection = this.connections.get(event.userId);
      if (connection) {
        connection.send(JSON.stringify(event));
      }
    });
  }
}

しかし、これだけでは不十分だった。リアルタイムシステムには、さらなる課題があった。

順序保証の悪夢

ユーザーが素早く2つのメッセージを送信する。

Message 1: "会議を"
Message 2: "キャンセルします"

しかし、表示される順序が逆になったら?

Message 2: "キャンセルします"
Message 1: "会議を"

意味が真逆になってしまう。

Slackの解決策は、各チャンネルを「アクター」として扱うことだった。

// Akkaアクターによる順序保証
class ChannelActor extends Actor {
  var messageSequence = 0L
  
  def receive = {
    case PostMessage(text, userId) =>
      messageSequence += 1
      val message = Message(
        channelId = self.path.name,
        sequence = messageSequence,
        text = text,
        userId = userId,
        timestamp = System.currentTimeMillis
      )
      
      // 順序付けられたメッセージを配信
      eventBus ! MessagePosted(message)
      
      sender() ! MessagePosted(message)
  }
}

各チャンネルは単一のアクターで処理され、メッセージには連番が振られる。これにより、どんなに分散していても、順序は保証される。

プレゼンスという挑戦

「アクティブ」「退席中」「取り込み中」

これらの状態変化は、メッセージとは比較にならないほど頻繁だ。ユーザーがタブを切り替えるたび、マウスを動かすたび、状態は変化する。

100万人のユーザーが同時にアクティブな場合、プレゼンスイベントは毎秒数百万に達する。

// プレゼンスの最適化
class PresenceOptimizer {
  constructor() {
    this.lastUpdate = new Map();
    this.updateInterval = 30000; // 30秒
  }
  
  shouldUpdatePresence(userId, newStatus) {
    const last = this.lastUpdate.get(userId);
    if (!last) return true;
    
    // ステータスが変わった、または30秒経過
    return last.status !== newStatus || 
           Date.now() - last.timestamp > this.updateInterval;
  }
  
  async updatePresence(userId, status) {
    if (!this.shouldUpdatePresence(userId, status)) {
      return; // スキップ
    }
    
    this.lastUpdate.set(userId, {
      status,
      timestamp: Date.now()
    });
    
    // バッチ処理のためにキューに入れる
    await this.batchQueue.add({
      userId,
      status,
      timestamp: Date.now()
    });
  }
  
  // 100ミリ秒ごとにバッチ処理
  async processBatch() {
    const updates = await this.batchQueue.flush();
    if (updates.length === 0) return;
    
    // 同じユーザーの重複を除去
    const uniqueUpdates = this.deduplicateUpdates(updates);
    
    // 一度に配信
    await this.publishPresenceUpdate(uniqueUpdates);
  }
}

これらの最適化により、Slackは数百万の同時接続を処理しながら、ミリ秒単位のレイテンシを実現している。

Spotify:4億曲のシンフォニー

Spotifyで曲を再生する。この単純な行為の裏で、膨大なイベントが飛び交っている。

再生開始、一時停止、スキップ、シーク、完了。これらすべてがイベントとして記録される。なぜか?それは、Spotifyのビジネスモデルの中核だからだ。

アーティストへのロイヤリティ支払い、レコメンデーション、Discover Weekly、年間まとめ。すべてがこれらのイベントから生成される。

しかし、規模が尋常ではない。4億人のユーザーが、毎日数十億の再生イベントを生成する。

イベントの階層化

Spotifyのデータエンジニア、リサ・アンダーソンは説明する。「すべてのイベントが同じ重要度ではない。私たちは3層のイベントシステムを構築した」

# Tier 1: Critical Events(絶対に失ってはいけない)
class CriticalEventHandler:
    def handle_play_event(self, event):
        # ロイヤリティ計算に直結
        # 同期的にKafkaに書き込み、確認を待つ
        future = self.producer.send(
            'critical-plays',
            event,
            partition=self.get_partition(event.user_id)
        )
        future.get(timeout=10)  # 確実に永続化
        
# Tier 2: Important Events(失っても補完可能)
class ImportantEventHandler:
    def handle_search_event(self, event):
        # レコメンデーション改善に使用
        # 非同期でバッファリング
        self.buffer.add(event)
        if len(self.buffer) > 1000:
            self.flush_buffer()
            
# Tier 3: Nice-to-have Events(統計用)
class StatisticsEventHandler:
    def handle_ui_interaction(self, event):
        # サンプリングして送信
        if random.random() < 0.1:  # 10%サンプリング
            self.send_async(event)

グローバル分散の課題

Spotifyは世界中にユーザーがいる。東京のユーザーの再生イベントが、ストックホルムのデータセンターまで到達するには時間がかかる。

しかし、ユーザーは即座のフィードバックを期待する。「いいね」を押したら、すぐにハートが光るべきだ。

解決策は、「ローカルファースト、グローバルエベンチュアル」だった。

// エッジでの楽観的更新
class EdgeEventHandler {
  async handleLikeEvent(songId, userId) {
    // 1. ローカルで即座に反映(楽観的更新)
    await this.localCache.set(`like:${userId}:${songId}`, true);
    this.updateUI(songId, { liked: true });
    
    // 2. ローカルイベントストアに保存
    const event = {
      type: 'song.liked',
      songId,
      userId,
      timestamp: Date.now(),
      region: 'asia-northeast1'
    };
    await this.localEventStore.append(event);
    
    // 3. グローバルへの同期(非同期)
    this.globalSync.schedule(event);
  }
}

// グローバル同期サービス
class GlobalSyncService {
  async syncEvents() {
    const events = await this.localEventStore.getUnsynced();
    
    // リージョンごとにバッチ処理
    const batches = this.groupByRegion(events);
    
    for (const [region, regionEvents] of batches) {
      await this.crossRegionPublish(region, regionEvents);
    }
  }
  
  // コンフリクト解決
  resolveConflict(localEvent, globalEvent) {
    // Last Write Wins with Vector Clocks
    if (this.compareVectorClocks(localEvent.clock, globalEvent.clock) > 0) {
      return localEvent;
    }
    return globalEvent;
  }
}

この設計により、Spotifyは世界中で低レイテンシを実現しながら、最終的なデータ整合性も保証している。

分散トランザクション:Amazonの注文処理

「注文しました」のボタンを押す。

この瞬間、Amazonのシステムでは数十の処理が同時に走り出す。在庫確認、支払い処理、配送手配、推薦エンジン更新、売上計上...

しかし、これらすべてを同期的に処理していたら、注文完了まで数分かかってしまう。

Amazonの解決策は、「Saga」パターンだった。

// Amazon注文処理のSaga(簡略化)
class OrderSaga {
  constructor(orderId) {
    this.orderId = orderId;
    this.state = 'started';
    this.completedSteps = [];
  }
  
  async execute() {
    try {
      // Step 1: 在庫確保
      await this.reserveInventory();
      this.completedSteps.push('inventory_reserved');
      
      // Step 2: 支払い処理
      await this.processPayment();
      this.completedSteps.push('payment_processed');
      
      // Step 3: 配送手配
      await this.scheduleShipping();
      this.completedSteps.push('shipping_scheduled');
      
      // Step 4: 確認メール
      await this.sendConfirmation();
      this.completedSteps.push('confirmation_sent');
      
      this.state = 'completed';
      
    } catch (error) {
      // 失敗したら補償トランザクション実行
      await this.compensate(error);
      this.state = 'failed';
      throw error;
    }
  }
  
  async compensate(error) {
    // 完了したステップを逆順で取り消し
    const reverseSteps = [...this.completedSteps].reverse();
    
    for (const step of reverseSteps) {
      switch (step) {
        case 'shipping_scheduled':
          await this.cancelShipping();
          break;
        case 'payment_processed':
          await this.refundPayment();
          break;
        case 'inventory_reserved':
          await this.releaseInventory();
          break;
      }
    }
  }
}

しかし、Sagaパターンにも限界があった。複雑な依存関係、部分的な失敗、補償処理の失敗...

そこでAmazonは、より洗練されたアプローチを採用した。

イベントソースドSaga

各ステップの実行を「イベント」として記録し、失敗時はイベントを再生して状態を復元する。

class EventSourcedSaga:
    def __init__(self, saga_id):
        self.saga_id = saga_id
        self.events = []
        self.state = {}
        
    async def handle_command(self, command):
        if command.type == 'StartOrder':
            await self.publish_event(OrderStarted(
                order_id=command.order_id,
                customer_id=command.customer_id,
                items=command.items
            ))
            
    async def apply_event(self, event):
        if isinstance(event, OrderStarted):
            self.state['status'] = 'started'
            self.state['order_id'] = event.order_id
            
        elif isinstance(event, InventoryReserved):
            self.state['inventory_reserved'] = True
            
        elif isinstance(event, PaymentCompleted):
            self.state['payment_completed'] = True
            
        self.events.append(event)
        
    async def handle_inventory_response(self, response):
        if response.success:
            await self.publish_event(InventoryReserved(
                order_id=self.state['order_id'],
                items=response.reserved_items
            ))
            # 次のステップへ
            await self.request_payment()
        else:
            await self.publish_event(InventoryReservationFailed(
                order_id=self.state['order_id'],
                reason=response.error
            ))
            # Sagaを終了
            await self.complete_saga('failed')

この設計により、Amazonは99.99%の注文成功率を維持しながら、ピーク時には秒間数万の注文を処理している。

分散トランザクションの鍵は「補償可能性」です。各操作は取り消し可能でなければなりません。これにより、部分的な失敗から確実に回復できます。

CloudflareのエッジでのイベントCDN

2023年、Cloudflareは革新的なアーキテクチャを発表した。「Event CDN」だ。

従来のCDNは静的コンテンツをキャッシュする。しかし、Event CDNは「イベント」をエッジでキャッシュし、処理する。

// Cloudflare Workers でのイベント処理
export default {
  async fetch(request, env) {
    const event = await request.json();
    
    // エッジでイベントを処理
    if (event.type === 'page.view') {
      // 地理的に近いエッジで集計
      await env.ANALYTICS.increment(
        `views:${event.page}:${env.CF.country}`,
        1
      );
      
      // バッチングして中央に送信
      await env.EVENT_BATCH.add(event);
      
      if (await env.EVENT_BATCH.size() > 1000) {
        const batch = await env.EVENT_BATCH.flush();
        await sendToOrigin(batch);
      }
    }
    
    return new Response('OK');
  }
};

これにより、グローバルなイベント処理が劇的に高速化された。日本のユーザーのイベントは東京のエッジで処理され、必要な部分だけが本社に送信される。

イベント駆動の未来

2024年、イベント駆動アーキテクチャは新たな進化を遂げている。

AIとの融合

OpenAIやAnthropicのAPIは、本質的にイベント駆動だ。プロンプトを送信し、ストリーミングでレスポンスを受け取る。

// AI駆動のイベント処理
class AIEventEnricher {
  async processEvent(event) {
    if (event.type === 'customer.feedback') {
      // AIで感情分析
      const sentiment = await this.ai.analyze(event.text);
      
      // イベントを充実化
      const enrichedEvent = {
        ...event,
        sentiment: sentiment.score,
        topics: sentiment.topics,
        urgency: sentiment.urgency
      };
      
      // 緊急度に応じて異なるキューへ
      if (sentiment.urgency > 0.8) {
        await this.publish('urgent-feedback', enrichedEvent);
      } else {
        await this.publish('normal-feedback', enrichedEvent);
      }
    }
  }
}

エッジコンピューティングの普及

5Gの普及により、エッジでのイベント処理がさらに重要になる。自動運転車、IoTデバイス、AR/VRアプリケーション。すべてがリアルタイムのイベント処理を必要とする。

ブロックチェーンとの統合

イベントの改竄防止、分散合意形成。ブロックチェーンとイベント駆動の組み合わせは、新たな信頼モデルを生み出している。

あなたのシステムは「待たせて」いないか?

イベント駆動アーキテクチャの本質は、「待たない」ことだ。

ユーザーを待たせない。システムを待たせない。開発者を待たせない。

PayPalが45分間沈黙したのは、すべてが「待っていた」からだ。LinkedInのメッセージが遅かったのは、すべての処理の完了を「待っていた」からだ。

現代のシステムに求められるのは、リアルタイム性と柔軟性だ。ユーザーの期待は高まり続け、システムの複雑性は増し続ける。

イベント駆動は、この矛盾を解決する鍵となる。

しかし、銀の弾丸ではない。イベントの順序、重複処理、結果整合性。これらの課題と向き合う覚悟が必要だ。

あなたのシステムは、次のPayPalになる前に、イベント駆動への進化を始められるだろうか?

未来は、待ってくれない。