𝕏 f B! L
案件・求人数 12,345
案件を探す(準備中) エージェントを探す(準備中) お役立ち情報 ログイン
案件・求人数 12,345
Google Cloud Pub/Sub入門|SESエンジニア向けメッセージング実践ガイド

Google Cloud Pub/Sub入門|SESエンジニア向けメッセージング実践ガイド

Google CloudPub/SubメッセージングSESエンジニア
目次
⚡ 3秒でわかる!この記事のポイント
  • Pub/Subはフルマネージドのリアルタイムメッセージングサービス
  • マイクロサービス間の非同期連携に最適で大規模処理に強い
  • イベント駆動アーキテクチャの実装スキルは高単価案件で必須

「マイクロサービス間の通信をどう設計すればいいかわからない…」「同期処理だとスケーリングに限界がある」「イベント駆動アーキテクチャの実装経験を積みたい」

Google Cloud Pub/Subは、フルマネージドの非同期メッセージングサービスです。サービス間の疎結合を実現し、大規模なイベント駆動アーキテクチャの基盤として利用されています。

この記事では、Google Cloud完全攻略シリーズEp.12として、Pub/Subの基礎から実践的なメッセージングパターンまで、SESエンジニアが現場で即活用できる知識を解説します。

この記事でわかること
  • Pub/Subの基本概念とアーキテクチャ
  • トピック・サブスクリプションの設計
  • メッセージングパターンの実装
  • デッドレターキューとエラーハンドリング
  • マイクロサービス連携の設計

Google Cloud Pub/Subとは

Google Cloud Pub/Subによるメッセージング全体像

Pub/Sub(Publish/Subscribe)は、Publisher(発行者)がメッセージを送信し、Subscriber(購読者)がそのメッセージを受信する非同期メッセージングモデルです。PublisherとSubscriberは互いを知らなくても通信でき、これが疎結合の実現に繋がります。

Pub/Subの主な特徴

特徴内容
フルマネージドインフラ管理不要、自動スケーリング
グローバル世界中のリージョンで低レイテンシ配信
高スループット毎秒数百万メッセージを処理可能
At-least-onceメッセージの確実な配信保証
メッセージ保持最大31日間のメッセージ保持
Push/Pull両方の配信モードに対応

基本コンポーネント

Publisher → Topic → Subscription → Subscriber

Publisher: メッセージを送信する側
Topic: メッセージのカテゴリ(チャネル)
Subscription: Topicに対する購読設定
Subscriber: メッセージを受信・処理する側

Pub/Subの基本操作

トピックとサブスクリプションの作成

# gcloud CLIでの操作

# トピック作成
gcloud pubsub topics create order-events

# サブスクリプション作成(Pull型)
gcloud pubsub subscriptions create order-processor \
  --topic=order-events \
  --ack-deadline=60 \
  --message-retention-duration=7d

# サブスクリプション作成(Push型)
gcloud pubsub subscriptions create order-webhook \
  --topic=order-events \
  --push-endpoint=https://api.example.com/webhook/orders \
  --ack-deadline=30

メッセージの発行と受信

# メッセージ発行
gcloud pubsub topics publish order-events \
  --message='{"orderId": "12345", "event": "created", "amount": 5000}' \
  --attribute=event_type=order_created,priority=high

# メッセージ受信(Pull)
gcloud pubsub subscriptions pull order-processor --limit=10 --auto-ack

Node.js SDKでの実装

// Publisher(メッセージ発行側)
import { PubSub } from '@google-cloud/pubsub';

const pubsub = new PubSub({ projectId: 'my-project' });

async function publishOrderEvent(order: Order) {
  const topic = pubsub.topic('order-events');

  const message = {
    data: Buffer.from(JSON.stringify({
      orderId: order.id,
      event: 'created',
      amount: order.amount,
      customerId: order.customerId,
      items: order.items,
      timestamp: new Date().toISOString(),
    })),
    attributes: {
      event_type: 'order_created',
      priority: order.amount > 100000 ? 'high' : 'normal',
    },
  };

  const messageId = await topic.publishMessage(message);
  console.log(`Message published: ${messageId}`);
  return messageId;
}
// Subscriber(メッセージ受信側)
import { PubSub, Message } from '@google-cloud/pubsub';

const pubsub = new PubSub({ projectId: 'my-project' });

async function startOrderProcessor() {
  const subscription = pubsub.subscription('order-processor');

  subscription.on('message', async (message: Message) => {
    try {
      const data = JSON.parse(message.data.toString());
      console.log(`Processing order: ${data.orderId}`);

      // 注文処理ロジック
      await processOrder(data);

      // 処理成功 → ACK
      message.ack();
      console.log(`Order ${data.orderId} processed successfully`);
    } catch (error) {
      console.error(`Error processing order: ${error.message}`);
      // 処理失敗 → NACK(リトライ)
      message.nack();
    }
  });

  subscription.on('error', (error) => {
    console.error('Subscription error:', error);
  });

  console.log('Order processor started, waiting for messages...');
}

メッセージングパターン

ファンアウト(Fan-out)

1つのメッセージを複数のサービスに同時配信:

                  ┌─ Sub A → メール送信サービス

Topic: orders ──┼─ Sub B → 在庫管理サービス

                  └─ Sub C → 分析サービス
// 注文イベントを1回publishするだけで、3つのサービスが独立して処理
async function onOrderCreated(order: Order) {
  // 1つのTopicにpublish
  await publishOrderEvent(order);

  // Sub A: メール送信サービスが自動受信 → 確認メール送信
  // Sub B: 在庫管理サービスが自動受信 → 在庫引き当て
  // Sub C: 分析サービスが自動受信 → データ記録
}

負荷分散(Load Balancing)

複数のワーカーで1つのサブスクリプションを共有し、メッセージを分散処理:

// ワーカー1, 2, 3 が同じサブスクリプションをpull
// Pub/Subが自動的にメッセージを分散
async function startWorker(workerId: number) {
  const subscription = pubsub.subscription('heavy-processing', {
    flowControl: {
      maxMessages: 10, // 同時処理数を制限
      allowExcessMessages: false,
    },
  });

  subscription.on('message', async (message: Message) => {
    console.log(`Worker ${workerId} processing: ${message.id}`);
    await processHeavyTask(JSON.parse(message.data.toString()));
    message.ack();
  });
}

// 3つのワーカーを起動
for (let i = 1; i <= 3; i++) {
  startWorker(i);
}

メッセージフィルタリング

属性に基づいてメッセージをフィルタリング:

# 高優先度メッセージのみ受信するサブスクリプション
gcloud pubsub subscriptions create high-priority-orders \
  --topic=order-events \
  --message-filter='attributes.priority = "high"'

# 特定イベントタイプのみ受信
gcloud pubsub subscriptions create order-cancellations \
  --topic=order-events \
  --message-filter='attributes.event_type = "order_cancelled"'

デッドレターキュー(DLQ)

DLQの設計

処理失敗したメッセージを隔離するDLQの設定:

# デッドレタートピック作成
gcloud pubsub topics create order-events-dlq

# DLQ付きサブスクリプション
gcloud pubsub subscriptions create order-processor \
  --topic=order-events \
  --dead-letter-topic=order-events-dlq \
  --max-delivery-attempts=5 \
  --ack-deadline=60

DLQ処理の実装

// DLQモニタリング
async function monitorDeadLetters() {
  const dlqSubscription = pubsub.subscription('order-events-dlq-sub');

  dlqSubscription.on('message', async (message: Message) => {
    const deliveryAttempt = message.deliveryAttempt;
    const data = JSON.parse(message.data.toString());

    console.error(`Dead letter received (attempts: ${deliveryAttempt}):`, data);

    // アラート送信
    await sendAlert({
      severity: 'warning',
      title: 'メッセージ処理失敗',
      details: `注文 ${data.orderId} の処理が${deliveryAttempt}回失敗しました`,
    });

    // 手動確認用にDBに保存
    await saveToFailedMessages({
      messageId: message.id,
      data,
      error: message.attributes?.error_reason,
      deliveryAttempt,
    });

    message.ack();
  });
}

Terraformでのインフラ構築

# Terraformによるフル構成

# メイントピック
resource "google_pubsub_topic" "order_events" {
  name = "order-events"

  message_retention_duration = "604800s" # 7日

  schema_settings {
    schema   = google_pubsub_schema.order_event.id
    encoding = "JSON"
  }
}

# スキーマ定義(メッセージ検証)
resource "google_pubsub_schema" "order_event" {
  name       = "order-event-schema"
  type       = "AVRO"
  definition = jsonencode({
    type = "record"
    name = "OrderEvent"
    fields = [
      { name = "orderId", type = "string" },
      { name = "event", type = "string" },
      { name = "amount", type = "int" },
      { name = "timestamp", type = "string" },
    ]
  })
}

# Pullサブスクリプション
resource "google_pubsub_subscription" "order_processor" {
  name  = "order-processor"
  topic = google_pubsub_topic.order_events.id

  ack_deadline_seconds       = 60
  message_retention_duration = "604800s"

  retry_policy {
    minimum_backoff = "10s"
    maximum_backoff = "600s"
  }

  dead_letter_policy {
    dead_letter_topic     = google_pubsub_topic.order_events_dlq.id
    max_delivery_attempts = 5
  }

  expiration_policy {
    ttl = "" # 無期限
  }
}

# デッドレタートピック
resource "google_pubsub_topic" "order_events_dlq" {
  name = "order-events-dlq"
}

# Pushサブスクリプション(Cloud Run向け)
resource "google_pubsub_subscription" "order_webhook" {
  name  = "order-webhook"
  topic = google_pubsub_topic.order_events.id

  push_config {
    push_endpoint = google_cloud_run_service.order_handler.status[0].url
    oidc_token {
      service_account_email = google_service_account.pubsub_invoker.email
    }
  }
}

Cloud Functionsとの連携

イベントトリガー関数

// Cloud Functions(第2世代)でのPub/Subトリガー
import { CloudEvent } from '@google-cloud/functions-framework';

interface OrderEventData {
  orderId: string;
  event: string;
  amount: number;
  customerId: string;
}

export async function processOrderEvent(cloudEvent: CloudEvent<{ message: { data: string } }>) {
  const base64Data = cloudEvent.data.message.data;
  const data: OrderEventData = JSON.parse(
    Buffer.from(base64Data, 'base64').toString()
  );

  console.log(`Processing ${data.event} for order ${data.orderId}`);

  switch (data.event) {
    case 'created':
      await handleOrderCreated(data);
      break;
    case 'paid':
      await handleOrderPaid(data);
      break;
    case 'shipped':
      await handleOrderShipped(data);
      break;
    case 'cancelled':
      await handleOrderCancelled(data);
      break;
    default:
      console.warn(`Unknown event type: ${data.event}`);
  }
}

デプロイ

gcloud functions deploy processOrderEvent \
  --gen2 \
  --runtime=nodejs20 \
  --region=asia-northeast1 \
  --trigger-topic=order-events \
  --entry-point=processOrderEvent \
  --memory=256MB \
  --timeout=60s

マイクロサービス設計パターン

Sagaパターン(分散トランザクション)

注文サービス → [order-events] → 在庫サービス → [inventory-events] → 決済サービス
                                      │                                    │
                                      ↓                                    ↓
                               在庫不足 → [compensation-events] → 注文キャンセル
                                                決済失敗 → [compensation-events] → 在庫戻し
// Sagaオーケストレーター
class OrderSaga {
  async execute(order: Order) {
    try {
      // Step 1: 在庫引き当て
      await publishMessage('inventory-commands', {
        action: 'reserve',
        orderId: order.id,
        items: order.items,
      });

      // Step 2: 決済処理
      await publishMessage('payment-commands', {
        action: 'charge',
        orderId: order.id,
        amount: order.amount,
      });

      // Step 3: 配送手配
      await publishMessage('shipping-commands', {
        action: 'schedule',
        orderId: order.id,
        address: order.shippingAddress,
      });
    } catch (error) {
      // 補償トランザクション
      await this.compensate(order, error);
    }
  }

  private async compensate(order: Order, error: Error) {
    await publishMessage('compensation-events', {
      orderId: order.id,
      reason: error.message,
      rollbackSteps: ['inventory', 'payment'],
    });
  }
}

監視とデバッグ

Cloud Monitoringとの連携

# 未確認メッセージ数の監視アラート
gcloud monitoring policies create \
  --display-name="Pub/Sub 未処理メッセージ急増" \
  --condition-display-name="num_undelivered > 1000" \
  --condition-filter='resource.type="pubsub_subscription" AND metric.type="pubsub.googleapis.com/subscription/num_undelivered_messages"' \
  --condition-threshold-value=1000 \
  --condition-threshold-duration=300s

重要メトリクス

メトリクス意味閾値目安
num_undelivered_messages未配信メッセージ数< 100
oldest_unacked_message_age最古の未確認メッセージ経過時間< 300秒
push_request_latenciesPush配信のレイテンシ< 1秒
dead_letter_message_countDLQ送信数0が理想

コスト最適化

料金体系

項目料金
メッセージ発行$40 / TB
メッセージ配信$40 / TB
メッセージ保持(7日超)$0.27 / GB / 月
無料枠毎月10GB

コスト削減のポイント

  1. メッセージサイズを最小限に — 大容量データはCloud Storage経由で参照
  2. バッチ発行で1回のリクエストで複数メッセージ送信
  3. 不要なサブスクリプションを削除 — 配信分だけ課金される
  4. メッセージ保持期間の適正化 — 必要以上に長くしない

SES現場で求められるスキル

必須スキルレベル

レベル1: Pub/Subの概念理解、gcloudコマンド操作
レベル2: Pull/Pushの実装、エラーハンドリング
レベル3: DLQ設計、フィルタリング、監視設定
レベル4: マイクロサービス設計、Sagaパターン実装

単価への影響

  • メッセージング基盤の経験あり → 月額5〜10万円アップ
  • マイクロサービス設計スキル → 月額70〜95万円の案件に対応可能
  • イベント駆動アーキテクチャの構築経験 → アーキテクト案件で重宝される

よくある質問

Q: Pub/SubとAWS SQS/SNSの違いは?

比較項目Cloud Pub/SubAWS SQS + SNS
モデルPub/Sub統合キュー(SQS) + 通知(SNS)
スケーリング自動設定が必要な場合あり
メッセージ順序Ordering Key対応FIFO Queue
グローバルデフォルトでグローバルリージョン単位

Q: メッセージの順序保証は?

デフォルトではメッセージの順序は保証されません。順序が必要な場合はOrdering Keyを使用します:

const topic = pubsub.topic('order-events', {
  enableMessageOrdering: true,
});

await topic.publishMessage({
  data: Buffer.from(JSON.stringify(event)),
  orderingKey: `customer-${customerId}`, // 同じ顧客の注文は順序保証
});

Q: Exactly-onceは実現できる?

Pub/Subのデフォルトはat-least-once配信ですが、Exactly-once delivery機能をサブスクリプション単位で有効化できます:

gcloud pubsub subscriptions create order-processor \
  --topic=order-events \
  --enable-exactly-once-delivery

ただし、パフォーマンスへの影響があるため、冪等性を持たせる設計が推奨されます。

まとめ

Google Cloud Pub/Subは、マイクロサービス間の非同期通信基盤として不可欠なサービスです。

この記事のまとめ
  • Pub/Subはフルマネージドの非同期メッセージングサービス
  • Fan-out・負荷分散・フィルタリングなど多彩なパターン
  • DLQとリトライ戦略で信頼性の高いメッセージ処理
  • Cloud Functions連携でサーバーレスなイベント駆動処理
  • マイクロサービス設計スキルはSES高単価案件で必須

Google Cloud完全攻略シリーズの他の記事も合わせてご覧ください:

SES案件をお探しですか?

SES記事をもっと読む →
🏗️

SES BASE 編集長

SES業界歴10年以上のメンバーが在籍する編集チーム。SES企業での営業・エンジニア経験、フリーランス独立経験を持つメンバーが、業界のリアルな情報をお届けします。

📊 業界データに基づく記事制作 🔍 IPA・経済産業省データ参照 💼 SES実務経験者が執筆・監修