- Claude Codeでイベント駆動アーキテクチャのコードを自動生成・設計支援できる
- Pub/Sub・イベントソーシング・CQRSの3大パターンを実装例付きで徹底解説
- SES案件で需要急増中のイベント駆動スキルを効率的に習得しキャリアアップに直結
「マイクロサービス間の連携をどう設計すべきか」「非同期処理のイベント基盤を構築してほしい」——SES案件でこうした要件に直面するエンジニアが増えています。2026年現在、イベント駆動アーキテクチャ(EDA: Event-Driven Architecture)は、大規模システムにおけるスケーラビリティと疎結合性を実現する主要な設計パターンとして、多くの現場で採用されています。
しかし、イベント駆動アーキテクチャの設計・実装は複雑です。イベントの定義、メッセージブローカーの選定、イベントスキーマの管理、障害時のリカバリ戦略——考慮すべき要素が多岐にわたります。
この記事では、Claude Codeを活用してイベント駆動アーキテクチャを効率的に設計・実装する方法を、実践的なコード例とともに詳しく解説します。
- イベント駆動アーキテクチャの基礎概念と3つの主要パターン
- Claude Codeでイベントバスやメッセージングシステムを自動生成する方法
- イベントソーシング+CQRSの実装手順
- Kafka・RabbitMQ・Amazon EventBridgeの使い分け
- SES現場での実践パターンと年収アップの具体戦略
イベント駆動アーキテクチャとは?基礎から理解する
イベント駆動アーキテクチャの3つの構成要素
イベント駆動アーキテクチャは、イベントと呼ばれるメッセージを中心にシステム間の連携を行う設計パターンです。従来のリクエスト/レスポンス型との最大の違いは、サービス間の疎結合性にあります。
# イベント駆動の基本構成
┌──────────┐ Event ┌──────────────┐ Event ┌──────────┐
│ Producer │ ──────────→ │ Event Broker │ ──────────→ │ Consumer │
│ (発行者) │ │ (仲介者) │ │ (購読者) │
└──────────┘ └──────────────┘ └──────────┘
| 構成要素 | 役割 | 具体例 |
|---|---|---|
| Event Producer | イベントを発行する | 注文サービス、ユーザーサービス |
| Event Broker | イベントをルーティングする | Kafka、RabbitMQ、EventBridge |
| Event Consumer | イベントを受信して処理する | 在庫サービス、通知サービス |
なぜ今イベント駆動が求められるのか
2026年のSES市場において、イベント駆動アーキテクチャのスキルは高単価案件への直結要素です。
| 従来の同期通信 | イベント駆動 |
|---|---|
| サービスAがサービスBを直接呼ぶ | サービスAがイベントを発行するだけ |
| サービスBが停止するとAも影響 | サービスBが停止してもAは正常稼働 |
| スケーリングが困難 | コンシューマーを個別にスケール |
| 密結合(変更の影響が大) | 疎結合(独立してデプロイ可能) |
Claude Codeでイベント駆動のコード基盤を自動生成する
基本的なイベントバスの生成
Claude Codeにイベントバスの基盤コードを生成させる例を見てみましょう。
# Claude Codeへのプロンプト例
claude "TypeScriptでインメモリのイベントバスを実装してください。
要件:
- イベント名の型安全性を確保(ジェネリクス使用)
- subscribe/publish/unsubscribeメソッド
- ワイルドカードパターンマッチング対応
- エラーハンドリング(一つのハンドラーの失敗が他に影響しない)
- デッドレターキュー機能"
Claude Codeが生成するコードの例:
// src/infrastructure/event-bus.ts
type EventHandler<T = unknown> = (event: T) => Promise<void> | void;
interface EventEnvelope<T = unknown> {
id: string;
type: string;
timestamp: Date;
payload: T;
metadata: Record<string, string>;
}
interface DeadLetterEntry {
event: EventEnvelope;
error: Error;
handler: string;
failedAt: Date;
}
export class EventBus {
private handlers = new Map<string, Set<EventHandler>>();
private deadLetterQueue: DeadLetterEntry[] = [];
subscribe<T>(eventType: string, handler: EventHandler<T>): () => void {
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, new Set());
}
this.handlers.get(eventType)!.add(handler as EventHandler);
// unsubscribe関数を返す
return () => {
this.handlers.get(eventType)?.delete(handler as EventHandler);
};
}
async publish<T>(eventType: string, payload: T, metadata?: Record<string, string>): Promise<void> {
const envelope: EventEnvelope<T> = {
id: crypto.randomUUID(),
type: eventType,
timestamp: new Date(),
payload,
metadata: metadata ?? {},
};
const handlers = this.getMatchingHandlers(eventType);
await Promise.allSettled(
handlers.map(async (handler) => {
try {
await handler(envelope.payload);
} catch (error) {
this.deadLetterQueue.push({
event: envelope as EventEnvelope,
error: error instanceof Error ? error : new Error(String(error)),
handler: handler.name || 'anonymous',
failedAt: new Date(),
});
}
})
);
}
private getMatchingHandlers(eventType: string): EventHandler[] {
const matched: EventHandler[] = [];
for (const [pattern, handlerSet] of this.handlers) {
if (this.matchPattern(pattern, eventType)) {
matched.push(...handlerSet);
}
}
return matched;
}
private matchPattern(pattern: string, eventType: string): boolean {
if (pattern === eventType) return true;
if (pattern.endsWith('.*')) {
const prefix = pattern.slice(0, -2);
return eventType.startsWith(prefix);
}
return false;
}
getDeadLetters(): DeadLetterEntry[] {
return [...this.deadLetterQueue];
}
clearDeadLetters(): void {
this.deadLetterQueue = [];
}
}
Kafkaベースのイベントプロデューサー/コンシューマー生成
本番環境で多く使われるApache Kafkaとの連携コードも、Claude Codeで効率的に生成できます。
claude "KafkaJS(kafkajs)を使ったイベントプロデューサーとコンシューマーのTypeScriptコードを生成して。
要件:
- Avroスキーマでイベントのシリアライズ/デシリアライズ
- コンシューマーグループによる負荷分散
- リトライ機能(指数バックオフ)
- ヘルスチェックエンドポイント
- Graceful shutdown"
// src/infrastructure/kafka/producer.ts
import { Kafka, Producer, CompressionTypes } from 'kafkajs';
interface ProducerConfig {
brokers: string[];
clientId: string;
ssl?: boolean;
sasl?: {
mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512';
username: string;
password: string;
};
}
export class EventProducer {
private kafka: Kafka;
private producer: Producer;
private isConnected = false;
constructor(config: ProducerConfig) {
this.kafka = new Kafka({
clientId: config.clientId,
brokers: config.brokers,
ssl: config.ssl,
sasl: config.sasl,
});
this.producer = this.kafka.producer({
allowAutoTopicCreation: false,
transactionTimeout: 30000,
});
}
async connect(): Promise<void> {
await this.producer.connect();
this.isConnected = true;
console.log('Kafka Producer connected');
}
async publishEvent<T>(
topic: string,
key: string,
event: T,
headers?: Record<string, string>
): Promise<void> {
if (!this.isConnected) {
throw new Error('Producer is not connected');
}
await this.producer.send({
topic,
compression: CompressionTypes.GZIP,
messages: [
{
key,
value: JSON.stringify(event),
headers: {
'event-id': crypto.randomUUID(),
'event-timestamp': new Date().toISOString(),
...headers,
},
},
],
});
}
async disconnect(): Promise<void> {
await this.producer.disconnect();
this.isConnected = false;
}
}
// src/infrastructure/kafka/consumer.ts
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
interface ConsumerConfig {
brokers: string[];
groupId: string;
clientId: string;
}
type MessageHandler = (payload: {
topic: string;
key: string | null;
value: unknown;
headers: Record<string, string>;
}) => Promise<void>;
export class EventConsumer {
private kafka: Kafka;
private consumer: Consumer;
private handlers = new Map<string, MessageHandler>();
constructor(config: ConsumerConfig) {
this.kafka = new Kafka({
clientId: config.clientId,
brokers: config.brokers,
});
this.consumer = this.kafka.consumer({
groupId: config.groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
}
registerHandler(topic: string, handler: MessageHandler): void {
this.handlers.set(topic, handler);
}
async start(): Promise<void> {
await this.consumer.connect();
for (const topic of this.handlers.keys()) {
await this.consumer.subscribe({ topic, fromBeginning: false });
}
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
const handler = this.handlers.get(payload.topic);
if (!handler) return;
const headers: Record<string, string> = {};
if (payload.message.headers) {
for (const [k, v] of Object.entries(payload.message.headers)) {
if (v) headers[k] = v.toString();
}
}
await this.withRetry(() =>
handler({
topic: payload.topic,
key: payload.message.key?.toString() ?? null,
value: JSON.parse(payload.message.value?.toString() ?? '{}'),
headers,
}),
3
);
},
});
}
private async withRetry(fn: () => Promise<void>, maxRetries: number): Promise<void> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
await fn();
return;
} catch (error) {
if (attempt === maxRetries) throw error;
const delay = Math.pow(2, attempt) * 1000;
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
async stop(): Promise<void> {
await this.consumer.disconnect();
}
}
イベントソーシング(Event Sourcing)の実装
イベントソーシングとは
イベントソーシングは、状態の変更をすべてイベントとして記録し、現在の状態をイベントの再生(リプレイ)から導出するパターンです。
# 従来のCRUDアプローチ
UPDATE orders SET status = 'shipped' WHERE id = 123;
→ 変更前の情報は失われる
# イベントソーシング
Event 1: OrderCreated { orderId: 123, items: [...] }
Event 2: PaymentReceived { orderId: 123, amount: 5000 }
Event 3: OrderShipped { orderId: 123, trackingId: "JP123..." }
→ すべての履歴が保存される
Claude Codeでイベントストアを実装する
claude "イベントソーシングのイベントストアをTypeScriptで実装して。
要件:
- PostgreSQLをバックエンドに使用
- 楽観的ロック(バージョニング)
- スナップショット機能(N件ごとに自動作成)
- イベントのリプレイ機能
- 監査ログとの統合"
// src/infrastructure/event-store.ts
import { Pool } from 'pg';
interface DomainEvent {
type: string;
data: Record<string, unknown>;
metadata?: Record<string, unknown>;
}
interface StoredEvent extends DomainEvent {
eventId: string;
aggregateId: string;
aggregateType: string;
version: number;
timestamp: Date;
}
interface Snapshot {
aggregateId: string;
aggregateType: string;
version: number;
state: Record<string, unknown>;
createdAt: Date;
}
export class EventStore {
private pool: Pool;
private snapshotInterval: number;
constructor(pool: Pool, snapshotInterval = 50) {
this.pool = pool;
this.snapshotInterval = snapshotInterval;
}
async initialize(): Promise<void> {
await this.pool.query(`
CREATE TABLE IF NOT EXISTS events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
version INTEGER NOT NULL,
type VARCHAR(255) NOT NULL,
data JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
timestamp TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(aggregate_id, version)
);
CREATE TABLE IF NOT EXISTS snapshots (
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
version INTEGER NOT NULL,
state JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY(aggregate_id, version)
);
CREATE INDEX IF NOT EXISTS idx_events_aggregate
ON events(aggregate_id, version);
`);
}
async appendEvents(
aggregateId: string,
aggregateType: string,
events: DomainEvent[],
expectedVersion: number
): Promise<StoredEvent[]> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// 楽観的ロック: 現在のバージョンを確認
const { rows } = await client.query(
'SELECT MAX(version) as current_version FROM events WHERE aggregate_id = $1',
[aggregateId]
);
const currentVersion = rows[0]?.current_version ?? 0;
if (currentVersion !== expectedVersion) {
throw new Error(
`Concurrency conflict: expected version ${expectedVersion}, got ${currentVersion}`
);
}
const stored: StoredEvent[] = [];
for (let i = 0; i < events.length; i++) {
const version = expectedVersion + i + 1;
const result = await client.query(
`INSERT INTO events (aggregate_id, aggregate_type, version, type, data, metadata)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *`,
[
aggregateId,
aggregateType,
version,
events[i].type,
JSON.stringify(events[i].data),
JSON.stringify(events[i].metadata ?? {}),
]
);
stored.push(this.mapToStoredEvent(result.rows[0]));
}
// スナップショットの自動作成
const newVersion = expectedVersion + events.length;
if (newVersion % this.snapshotInterval === 0) {
const allEvents = await this.getEvents(aggregateId, client);
const state = this.replayEvents(allEvents);
await client.query(
`INSERT INTO snapshots (aggregate_id, aggregate_type, version, state)
VALUES ($1, $2, $3, $4)
ON CONFLICT (aggregate_id, version) DO NOTHING`,
[aggregateId, aggregateType, newVersion, JSON.stringify(state)]
);
}
await client.query('COMMIT');
return stored;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async getEvents(aggregateId: string, client?: any): Promise<StoredEvent[]> {
const conn = client ?? this.pool;
// まずスナップショットを確認
const snapshotResult = await conn.query(
`SELECT * FROM snapshots WHERE aggregate_id = $1
ORDER BY version DESC LIMIT 1`,
[aggregateId]
);
let fromVersion = 0;
if (snapshotResult.rows.length > 0) {
fromVersion = snapshotResult.rows[0].version;
}
const { rows } = await conn.query(
`SELECT * FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version ASC`,
[aggregateId, fromVersion]
);
return rows.map(this.mapToStoredEvent);
}
private replayEvents(events: StoredEvent[]): Record<string, unknown> {
return events.reduce((state, event) => {
return { ...state, ...event.data, _version: event.version };
}, {} as Record<string, unknown>);
}
private mapToStoredEvent(row: any): StoredEvent {
return {
eventId: row.event_id,
aggregateId: row.aggregate_id,
aggregateType: row.aggregate_type,
version: row.version,
type: row.type,
data: row.data,
metadata: row.metadata,
timestamp: row.timestamp,
};
}
}
CQRSパターン(Command Query Responsibility Segregation)
CQRSとイベント駆動の相性
CQRSは、書き込み(Command)と読み取り(Query)の責務を分離するパターンです。イベントソーシングと組み合わせることで、高いスケーラビリティと柔軟なクエリモデルを実現できます。
# CQRSアーキテクチャ
┌─────────────────┐
Command ────────→ │ Write Model │
(注文作成) │ (Event Store) │
└────────┬────────┘
│ Event
↓
┌─────────────────┐
│ Event Handler │
│ (プロジェクション)│
└────────┬────────┘
│ Update
↓
Query ────────→ ┌─────────────────┐
(注文一覧取得) │ Read Model │
│ (最適化DB) │
└─────────────────┘
Claude Codeで実装するCQRS基盤
claude "CQRSパターンをTypeScriptで実装して。
コマンドバス + クエリバス + プロジェクション(読み取りモデル自動更新)の3層構成で。
具体的なEC注文のユースケースで示して。"
// src/cqrs/command-bus.ts
interface Command {
type: string;
}
type CommandHandler<T extends Command> = (command: T) => Promise<void>;
export class CommandBus {
private handlers = new Map<string, CommandHandler<any>>();
register<T extends Command>(commandType: string, handler: CommandHandler<T>): void {
this.handlers.set(commandType, handler);
}
async dispatch<T extends Command>(command: T): Promise<void> {
const handler = this.handlers.get(command.type);
if (!handler) {
throw new Error(`No handler registered for command: ${command.type}`);
}
await handler(command);
}
}
// src/cqrs/query-bus.ts
interface Query {
type: string;
}
type QueryHandler<T extends Query, R> = (query: T) => Promise<R>;
export class QueryBus {
private handlers = new Map<string, QueryHandler<any, any>>();
register<T extends Query, R>(queryType: string, handler: QueryHandler<T, R>): void {
this.handlers.set(queryType, handler);
}
async execute<T extends Query, R>(query: T): Promise<R> {
const handler = this.handlers.get(query.type);
if (!handler) {
throw new Error(`No handler registered for query: ${query.type}`);
}
return handler(query);
}
}
// src/cqrs/projections/order-projection.ts
interface OrderReadModel {
orderId: string;
customerId: string;
status: string;
totalAmount: number;
items: Array<{ productId: string; quantity: number; price: number }>;
createdAt: Date;
updatedAt: Date;
}
export class OrderProjection {
private readModels = new Map<string, OrderReadModel>();
async handleEvent(event: { type: string; data: any }): Promise<void> {
switch (event.type) {
case 'OrderCreated':
this.readModels.set(event.data.orderId, {
orderId: event.data.orderId,
customerId: event.data.customerId,
status: 'pending',
totalAmount: event.data.totalAmount,
items: event.data.items,
createdAt: new Date(),
updatedAt: new Date(),
});
break;
case 'PaymentReceived':
const order = this.readModels.get(event.data.orderId);
if (order) {
order.status = 'paid';
order.updatedAt = new Date();
}
break;
case 'OrderShipped':
const shipped = this.readModels.get(event.data.orderId);
if (shipped) {
shipped.status = 'shipped';
shipped.updatedAt = new Date();
}
break;
}
}
getOrder(orderId: string): OrderReadModel | undefined {
return this.readModels.get(orderId);
}
getOrdersByCustomer(customerId: string): OrderReadModel[] {
return Array.from(this.readModels.values())
.filter(o => o.customerId === customerId);
}
getOrdersByStatus(status: string): OrderReadModel[] {
return Array.from(this.readModels.values())
.filter(o => o.status === status);
}
}
実践:EC注文システムをイベント駆動で構築する
システム全体のイベントフロー
ECサイトの注文処理を題材に、イベント駆動アーキテクチャの実践的な実装を見てみましょう。
顧客が注文 → OrderCreated
↓
決済サービスが受信 → PaymentProcessed
↓
在庫サービスが受信 → InventoryReserved
↓
配送サービスが受信 → ShipmentCreated
↓
通知サービスが受信 → NotificationSent
Claude Codeでドメインイベントを定義する
claude "ECサイトの注文ドメインにおけるイベント定義をTypeScriptで作成して。
以下のイベントを含めて:
- OrderCreated, OrderCancelled
- PaymentProcessed, PaymentFailed
- InventoryReserved, InventoryShortage
- ShipmentCreated, ShipmentDelivered
各イベントにはスキーマバージョニングを入れて。"
// src/domain/events/order-events.ts
interface BaseEvent {
eventId: string;
occurredAt: Date;
schemaVersion: number;
}
export interface OrderCreated extends BaseEvent {
type: 'OrderCreated';
schemaVersion: 1;
data: {
orderId: string;
customerId: string;
items: Array<{
productId: string;
productName: string;
quantity: number;
unitPrice: number;
}>;
totalAmount: number;
shippingAddress: {
postalCode: string;
prefecture: string;
city: string;
line1: string;
line2?: string;
};
};
}
export interface PaymentProcessed extends BaseEvent {
type: 'PaymentProcessed';
schemaVersion: 1;
data: {
orderId: string;
paymentId: string;
amount: number;
method: 'credit_card' | 'bank_transfer' | 'convenience_store';
processedAt: Date;
};
}
export interface InventoryReserved extends BaseEvent {
type: 'InventoryReserved';
schemaVersion: 1;
data: {
orderId: string;
reservations: Array<{
productId: string;
quantity: number;
warehouseId: string;
}>;
};
}
export interface ShipmentCreated extends BaseEvent {
type: 'ShipmentCreated';
schemaVersion: 1;
data: {
orderId: string;
shipmentId: string;
carrier: string;
trackingNumber: string;
estimatedDelivery: Date;
};
}
export type OrderDomainEvent =
| OrderCreated
| PaymentProcessed
| InventoryReserved
| ShipmentCreated;
Sagaパターンで分散トランザクションを管理する
イベント駆動アーキテクチャでは、複数サービスにまたがるトランザクションの管理が課題になります。Sagaパターンは、各ステップの補償アクション(ロールバック)を定義することで、分散トランザクションの整合性を保証します。
claude "Sagaパターン(オーケストレーション型)をTypeScriptで実装して。
注文作成→決済→在庫確保→配送の4ステップで、
各ステップに補償アクション(ロールバック)を定義して。"
// src/sagas/order-saga.ts
interface SagaStep {
name: string;
execute: (context: SagaContext) => Promise<void>;
compensate: (context: SagaContext) => Promise<void>;
}
interface SagaContext {
orderId: string;
data: Record<string, unknown>;
completedSteps: string[];
}
export class SagaOrchestrator {
private steps: SagaStep[] = [];
addStep(step: SagaStep): this {
this.steps.push(step);
return this;
}
async execute(initialContext: Omit<SagaContext, 'completedSteps'>): Promise<void> {
const context: SagaContext = {
...initialContext,
completedSteps: [],
};
for (const step of this.steps) {
try {
console.log(`Executing step: ${step.name}`);
await step.execute(context);
context.completedSteps.push(step.name);
} catch (error) {
console.error(`Step ${step.name} failed:`, error);
await this.compensate(context);
throw new Error(`Saga failed at step: ${step.name}`);
}
}
}
private async compensate(context: SagaContext): Promise<void> {
// 完了済みステップを逆順で補償
const stepsToCompensate = [...context.completedSteps].reverse();
for (const stepName of stepsToCompensate) {
const step = this.steps.find(s => s.name === stepName);
if (step) {
try {
console.log(`Compensating step: ${step.name}`);
await step.compensate(context);
} catch (error) {
console.error(`Compensation failed for ${step.name}:`, error);
// 補償失敗はログして続行(最善努力)
}
}
}
}
}
// 注文Sagaの構築
const orderSaga = new SagaOrchestrator()
.addStep({
name: 'createOrder',
execute: async (ctx) => {
// 注文レコードをDBに作成
console.log(`Creating order: ${ctx.orderId}`);
},
compensate: async (ctx) => {
// 注文をキャンセル状態に更新
console.log(`Cancelling order: ${ctx.orderId}`);
},
})
.addStep({
name: 'processPayment',
execute: async (ctx) => {
// 決済APIを呼び出し
console.log(`Processing payment for: ${ctx.orderId}`);
},
compensate: async (ctx) => {
// 返金処理
console.log(`Refunding payment for: ${ctx.orderId}`);
},
})
.addStep({
name: 'reserveInventory',
execute: async (ctx) => {
// 在庫を確保
console.log(`Reserving inventory for: ${ctx.orderId}`);
},
compensate: async (ctx) => {
// 在庫確保を解放
console.log(`Releasing inventory for: ${ctx.orderId}`);
},
})
.addStep({
name: 'createShipment',
execute: async (ctx) => {
// 配送手配
console.log(`Creating shipment for: ${ctx.orderId}`);
},
compensate: async (ctx) => {
// 配送キャンセル
console.log(`Cancelling shipment for: ${ctx.orderId}`);
},
});

メッセージブローカーの選定ガイド
Kafka vs RabbitMQ vs Amazon EventBridge
SES案件では、プロジェクトの要件に応じてメッセージブローカーを使い分ける必要があります。
| 観点 | Apache Kafka | RabbitMQ | Amazon EventBridge |
|---|---|---|---|
| スループット | 非常に高い(100万msg/秒) | 中程度(数万msg/秒) | 高い(サーバーレス) |
| メッセージ保持 | 永続的(設定可能) | 消費後に削除 | 24時間(アーカイブ可) |
| 順序保証 | パーティション内で保証 | キュー内で保証 | 保証なし |
| ユースケース | ログ収集・ストリーム処理 | タスクキュー・RPC | AWS統合・サーバーレス |
| SES案件での頻度 | ★★★★★ | ★★★★☆ | ★★★☆☆ |
| 学習コスト | 高い | 中程度 | 低い |
Claude Codeで適切なブローカーを選定する
claude "以下のシステム要件に最適なメッセージブローカーを選定して理由を説明して:
- 1日あたり500万イベント
- イベントの順序保証が必要
- 30日間のイベント保持が必要
- AWS環境で運用
- チーム5名でKafka経験者が1名"
デッドレターキューとエラーハンドリング
障害に強いイベント処理の実装
イベント駆動システムでは、メッセージの処理失敗への対策が不可欠です。
// src/infrastructure/resilient-consumer.ts
interface RetryPolicy {
maxRetries: number;
initialDelayMs: number;
maxDelayMs: number;
backoffMultiplier: number;
}
export class ResilientEventConsumer {
private retryPolicy: RetryPolicy;
private deadLetterHandler: (event: unknown, error: Error) => Promise<void>;
constructor(
retryPolicy: RetryPolicy,
deadLetterHandler: (event: unknown, error: Error) => Promise<void>
) {
this.retryPolicy = retryPolicy;
this.deadLetterHandler = deadLetterHandler;
}
async processWithRetry<T>(
event: T,
handler: (event: T) => Promise<void>
): Promise<void> {
let lastError: Error | null = null;
for (let attempt = 0; attempt <= this.retryPolicy.maxRetries; attempt++) {
try {
await handler(event);
return; // 成功
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error));
if (attempt < this.retryPolicy.maxRetries) {
const delay = Math.min(
this.retryPolicy.initialDelayMs *
Math.pow(this.retryPolicy.backoffMultiplier, attempt),
this.retryPolicy.maxDelayMs
);
console.warn(
`Retry ${attempt + 1}/${this.retryPolicy.maxRetries} ` +
`after ${delay}ms for event`
);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
// 全リトライ失敗 → デッドレターキューに送信
console.error('All retries exhausted, sending to DLQ');
await this.deadLetterHandler(event, lastError!);
}
}
SES現場でのイベント駆動アーキテクチャ活用パターン
パターン1: ECサイトの注文管理(高需要)
2026年のSES市場で最も多いイベント駆動の案件は、ECサイトの注文管理システムのリアーキテクトです。
必要スキル: Kafka + TypeScript/Java + Docker
想定単価: 75-95万円/月
案件期間: 6ヶ月〜1年
パターン2: IoTデータ収集パイプライン
センサーデータをリアルタイムに収集・処理するパイプラインも、イベント駆動アーキテクチャの代表的なユースケースです。
必要スキル: Kafka/Kinesis + Python + AWS
想定単価: 80-100万円/月
案件期間: 3ヶ月〜6ヶ月
パターン3: 金融系リアルタイム取引処理
金融システムでは、イベントソーシング+CQRSの導入案件が増加しています。
必要スキル: Event Sourcing + CQRS + Java/Kotlin
想定単価: 90-120万円/月
案件期間: 1年以上
Claude Codeでイベント駆動設計を効率化するベストプラクティス
1. CLAUDE.mdにイベント設計規約を定義する
# Event-Driven Architecture Rules
## イベント命名規則
- 過去形を使う(OrderCreated, NOT CreateOrder)
- ドメイン名をプレフィックスにする(Order.Created)
## イベントスキーマ
- 必須フィールド: eventId, type, occurredAt, schemaVersion
- ペイロードはdata配下にネスト
- 後方互換性のためフィールドの削除は禁止
## テスト要件
- イベント発行のユニットテスト必須
- Sagaの補償アクションのテスト必須
- イベントのシリアライズ/デシリアライズのテスト必須
2. イベントカタログの自動生成
claude "プロジェクト内のすべてのドメインイベントを収集して、
イベントカタログ(一覧ドキュメント)をMarkdown形式で自動生成して。
各イベントのスキーマ、発行元、購読先も含めて。"
3. テストの自動生成
claude "以下のイベントハンドラーに対する統合テストを生成して:
- OrderCreatedハンドラー → 在庫確認 + 決済開始
- PaymentFailedハンドラー → 注文キャンセル + 在庫解放
テストにはイベントの再生(リプレイ)テストも含めて。"
まとめ
Claude Codeを活用したイベント駆動アーキテクチャの設計・実装について、主要なパターンと実践テクニックを解説しました。
- イベント駆動アーキテクチャは疎結合・スケーラビリティ・障害耐性の3つを実現する
- Claude Codeでイベントバス、プロデューサー/コンシューマー、Sagaパターンの実装コードを効率的に生成できる
- イベントソーシング+CQRSの組み合わせは高難度だが、Claude Codeの支援で実装ハードルが大幅に下がる
- Kafka、RabbitMQ、EventBridgeは要件に応じて使い分ける
- SES案件ではイベント駆動スキルが75〜120万円/月の高単価案件に直結する
次のステップ: イベント駆動アーキテクチャの基礎を押さえたら、Claude Codeでマイクロサービスを設計する方法で、より大規模なシステム設計に挑戦してみてください。
関連記事: