𝕏 f B! L
案件・求人数 12,345
案件を探す(準備中) エージェントを探す(準備中) お役立ち情報 ログイン
案件・求人数 12,345
Google Cloud Run Jobsバッチ処理完全ガイド|サーバーレスでスケーラブルなジョブ実行

Google Cloud Run Jobsバッチ処理完全ガイド|サーバーレスでスケーラブルなジョブ実行

Google CloudCloud Run Jobsバッチ処理サーバーレスコンテナ
目次

「バッチ処理のためだけにサーバーを常時稼働させるのはコストがもったいない」「Kubernetes上のCronJobの運用が複雑すぎる」——バッチ処理の効率的な実行は、多くの開発チームが直面する課題です。

Cloud Run Jobsは、サーバーレスでコンテナベースのバッチ処理を実行できるGoogle Cloudのサービスです。サーバー管理不要、使った分だけの課金、最大10,000タスクの並列実行が可能で、バッチ処理の運用コストを大幅に削減できます。

⚡ 3秒でわかる!この記事のポイント
  • Cloud Run Jobsはサーバーレスでコンテナバッチ処理を実行、サーバー管理不要
  • スケジュール実行・並列処理・リトライ機能で堅牢なバッチパイプラインを構築
  • SES案件でのGoogle Cloud需要が急増中、Cloud Run Jobsスキルが差別化要因に

Cloud Run Jobsとは

Cloud Run Jobsアーキテクチャ概要

Cloud Run Jobsは、Google Cloud Runの一機能として提供されるバッチ処理実行環境です。従来のCloud Run(サービス)がHTTPリクエストを処理するのに対し、Jobsは完了するまで実行するタスクに特化しています。

Cloud Run Service vs Cloud Run Jobs

特徴Cloud Run ServiceCloud Run Jobs
トリガーHTTPリクエスト手動/スケジュール/API
実行モデルリクエスト駆動完了まで実行
最大実行時間60分(デフォルト)24時間
並列実行オートスケール最大10,000タスク
課金リクエスト処理中タスク実行中
ユースケースWeb API、Webhookデータ処理、ETL、レポート

Cloud Run Jobsの主なユースケース

  • データ処理・ETL — 大量データの変換・集計・移行
  • レポート生成 — 日次/月次レポートの自動生成
  • バックアップ — データベースバックアップの定期実行
  • メール配信 — 大量メールの一括送信
  • 機械学習 — モデルの推論バッチ処理
  • ファイル処理 — 画像変換、PDF生成、CSVインポート
この記事でわかること
  • Cloud Run Jobsの基本概念と設定方法
  • バッチ処理の設計パターンと並列実行テクニック
  • スケジュール実行・エラーハンドリング・監視の設定
  • SES案件でのGoogle Cloud需要とキャリアへの影響

Cloud Run Jobsの基本設定

ジョブの作成

CLIでの作成:

# ジョブの作成
gcloud run jobs create data-processor \
  --image=asia-northeast1-docker.pkg.dev/PROJECT_ID/repo/data-processor:latest \
  --region=asia-northeast1 \
  --tasks=10 \
  --max-retries=3 \
  --task-timeout=3600 \
  --memory=2Gi \
  --cpu=2 \
  --set-env-vars="ENV=production,DB_HOST=10.0.0.1" \
  --set-secrets="DB_PASSWORD=db-password:latest" \
  --service-account=batch-processor@PROJECT_ID.iam.gserviceaccount.com \
  --vpc-connector=batch-vpc-connector

Terraformでの定義:

resource "google_cloud_run_v2_job" "data_processor" {
  name     = "data-processor"
  location = "asia-northeast1"

  template {
    task_count  = 10
    parallelism = 5

    template {
      timeout     = "3600s"
      max_retries = 3

      containers {
        image = "asia-northeast1-docker.pkg.dev/${var.project_id}/repo/data-processor:latest"

        resources {
          limits = {
            cpu    = "2"
            memory = "2Gi"
          }
        }

        env {
          name  = "ENV"
          value = "production"
        }

        env {
          name = "DB_PASSWORD"
          value_source {
            secret_key_ref {
              secret  = google_secret_manager_secret.db_password.secret_id
              version = "latest"
            }
          }
        }
      }

      service_account = google_service_account.batch_processor.email

      vpc_access {
        connector = google_vpc_access_connector.batch_connector.id
        egress    = "PRIVATE_RANGES_ONLY"
      }
    }
  }
}

コンテナイメージの構築

バッチ処理用のDockerイメージを構築します。

# Dockerfile
FROM node:20-slim AS builder
WORKDIR /app
COPY package*.json ./
RUN npm ci --production=false
COPY . .
RUN npm run build

FROM node:20-slim
WORKDIR /app
COPY --from=builder /app/dist ./dist
COPY --from=builder /app/node_modules ./node_modules
COPY --from=builder /app/package.json ./

# Cloud Run Jobsは環境変数でタスクインデックスを渡す
# CLOUD_RUN_TASK_INDEX: 0から始まるタスクインデックス
# CLOUD_RUN_TASK_COUNT: タスクの総数
# CLOUD_RUN_TASK_ATTEMPT: リトライ回数

CMD ["node", "dist/main.js"]

バッチ処理の設計パターン

パターン1: データ分割並列処理

大量データを複数タスクで分割処理する最も基本的なパターンです。

// src/main.ts - タスク分割処理
import { BigQuery } from '@google-cloud/bigquery';
import { Storage } from '@google-cloud/storage';

const TASK_INDEX = parseInt(process.env.CLOUD_RUN_TASK_INDEX || '0');
const TASK_COUNT = parseInt(process.env.CLOUD_RUN_TASK_COUNT || '1');
const ATTEMPT = parseInt(process.env.CLOUD_RUN_TASK_ATTEMPT || '0');

async function main() {
  console.log(`Task ${TASK_INDEX}/${TASK_COUNT} started (attempt: ${ATTEMPT})`);

  const bigquery = new BigQuery();
  const storage = new Storage();

  // データの範囲を計算(タスクインデックスに基づく分割)
  const totalRecords = await getTotalRecordCount(bigquery);
  const chunkSize = Math.ceil(totalRecords / TASK_COUNT);
  const offset = TASK_INDEX * chunkSize;
  const limit = Math.min(chunkSize, totalRecords - offset);

  console.log(`Processing records ${offset} to ${offset + limit}`);

  // データ取得と処理
  const query = `
    SELECT * FROM \`project.dataset.source_table\`
    ORDER BY id
    LIMIT ${limit} OFFSET ${offset}
  `;

  const [rows] = await bigquery.query({ query });

  // データ変換処理
  const processedData = rows.map((row: any) => transformRecord(row));

  // 結果をGCSに出力
  const bucket = storage.bucket('output-bucket');
  const file = bucket.file(`output/part-${TASK_INDEX.toString().padStart(5, '0')}.jsonl`);

  const content = processedData.map((d: any) => JSON.stringify(d)).join('\n');
  await file.save(content, { contentType: 'application/jsonl' });

  console.log(`Task ${TASK_INDEX} completed: ${processedData.length} records processed`);
}

function transformRecord(record: any) {
  return {
    id: record.id,
    name: record.name?.trim(),
    email: record.email?.toLowerCase(),
    processedAt: new Date().toISOString(),
    taskIndex: TASK_INDEX,
  };
}

async function getTotalRecordCount(bigquery: BigQuery): Promise<number> {
  const [result] = await bigquery.query({
    query: 'SELECT COUNT(*) as count FROM `project.dataset.source_table`',
  });
  return result[0].count;
}

main().catch((error) => {
  console.error(`Task ${TASK_INDEX} failed:`, error);
  process.exit(1);
});

パターン2: キュー駆動バッチ処理

Cloud Tasksと組み合わせて、キュー駆動のバッチ処理を実現します。

// src/queue-processor.ts
import { CloudTasksClient } from '@google-cloud/tasks';
import { Firestore } from '@google-cloud/firestore';

const db = new Firestore();
const TASK_INDEX = parseInt(process.env.CLOUD_RUN_TASK_INDEX || '0');

async function processQueue() {
  console.log(`Queue processor ${TASK_INDEX} started`);

  // ジョブキューからタスクを取得(排他制御)
  while (true) {
    const task = await claimNextTask();
    if (!task) {
      console.log('No more tasks in queue');
      break;
    }

    try {
      await processTask(task);
      await markTaskComplete(task.id);
      console.log(`Task ${task.id} completed`);
    } catch (error) {
      await markTaskFailed(task.id, error as Error);
      console.error(`Task ${task.id} failed:`, error);
    }
  }

  console.log(`Queue processor ${TASK_INDEX} finished`);
}

async function claimNextTask() {
  return db.runTransaction(async (tx) => {
    const snapshot = await tx
      .collection('job-queue')
      .where('status', '==', 'pending')
      .orderBy('priority', 'desc')
      .limit(1)
      .get();

    if (snapshot.empty) return null;

    const doc = snapshot.docs[0];
    tx.update(doc.ref, {
      status: 'processing',
      workerIndex: TASK_INDEX,
      startedAt: new Date(),
    });

    return { id: doc.id, ...doc.data() };
  });
}

async function processTask(task: any) {
  // タスクの種類に応じた処理
  switch (task.type) {
    case 'image-resize':
      await resizeImage(task.payload);
      break;
    case 'pdf-generate':
      await generatePdf(task.payload);
      break;
    case 'email-send':
      await sendEmail(task.payload);
      break;
    default:
      throw new Error(`Unknown task type: ${task.type}`);
  }
}

パターン3: ETLパイプライン

BigQuery → 変換 → Cloud Storage → BigQueryのETLパイプラインを構築します。

// src/etl-pipeline.ts
import { BigQuery } from '@google-cloud/bigquery';
import { Storage } from '@google-cloud/storage';

interface ETLConfig {
  sourceQuery: string;
  outputBucket: string;
  outputPrefix: string;
  destinationTable: string;
}

async function runETL(config: ETLConfig) {
  const bigquery = new BigQuery();
  const storage = new Storage();
  const taskIndex = parseInt(process.env.CLOUD_RUN_TASK_INDEX || '0');

  console.log(`[ETL] Task ${taskIndex}: Extract phase started`);

  // Extract: BigQueryからデータ取得
  const [extractJob] = await bigquery.createQueryJob({
    query: config.sourceQuery,
    location: 'asia-northeast1',
  });
  const [rows] = await extractJob.getQueryResults();

  console.log(`[ETL] Task ${taskIndex}: ${rows.length} rows extracted`);

  // Transform: データ変換
  const transformed = rows
    .map((row: any) => {
      try {
        return {
          ...row,
          processed_date: new Date().toISOString().split('T')[0],
          revenue_jpy: Math.round(row.revenue_usd * 150),
          category_normalized: normalizeCategory(row.category),
          is_valid: validateRecord(row),
        };
      } catch (e) {
        console.warn(`Transform error for row ${row.id}:`, e);
        return null;
      }
    })
    .filter(Boolean);

  console.log(`[ETL] Task ${taskIndex}: ${transformed.length} rows transformed`);

  // Load: 結果をCloud Storageに出力
  const outputPath = `${config.outputPrefix}/part-${taskIndex}.jsonl`;
  const bucket = storage.bucket(config.outputBucket);
  const file = bucket.file(outputPath);

  await file.save(
    transformed.map((r: any) => JSON.stringify(r)).join('\n'),
    { contentType: 'application/jsonl' }
  );

  // BigQueryにロード
  const [loadJob] = await bigquery
    .dataset('analytics')
    .table(config.destinationTable)
    .load(file, {
      sourceFormat: 'NEWLINE_DELIMITED_JSON',
      writeDisposition: 'WRITE_APPEND',
      autodetect: false,
    });

  console.log(`[ETL] Task ${taskIndex}: Load complete, ${loadJob.status?.statistics?.load?.outputRows} rows loaded`);
}

スケジュール実行の設定

Cloud Schedulerとの連携

定期的なバッチ処理をCloud Schedulerで自動実行します。

# 毎日午前2時(JST)に実行
gcloud scheduler jobs create http daily-data-sync \
  --location=asia-northeast1 \
  --schedule="0 2 * * *" \
  --time-zone="Asia/Tokyo" \
  --uri="https://asia-northeast1-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/PROJECT_ID/jobs/data-processor:run" \
  --http-method=POST \
  --oauth-service-account-email=scheduler-sa@PROJECT_ID.iam.gserviceaccount.com

Terraform設定:

resource "google_cloud_scheduler_job" "daily_batch" {
  name      = "daily-data-sync"
  region    = "asia-northeast1"
  schedule  = "0 2 * * *"
  time_zone = "Asia/Tokyo"

  http_target {
    http_method = "POST"
    uri         = "https://asia-northeast1-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/${var.project_id}/jobs/${google_cloud_run_v2_job.data_processor.name}:run"

    oauth_token {
      service_account_email = google_service_account.scheduler.email
    }
  }

  retry_config {
    retry_count          = 3
    min_backoff_duration = "30s"
    max_backoff_duration = "300s"
  }
}

Eventarcによるイベント駆動実行

GCSへのファイルアップロードをトリガーにジョブを実行します。

# GCSのファイル作成をトリガーに実行
gcloud eventarc triggers create csv-upload-trigger \
  --location=asia-northeast1 \
  --destination-run-job=csv-processor \
  --destination-run-region=asia-northeast1 \
  --event-filters="type=google.cloud.storage.object.v1.finalized" \
  --event-filters="bucket=input-csv-bucket" \
  --service-account=eventarc-sa@PROJECT_ID.iam.gserviceaccount.com

エラーハンドリングとリトライ

リトライ戦略の設計

// src/utils/retry.ts
interface RetryOptions {
  maxRetries: number;
  initialDelay: number;
  maxDelay: number;
  backoffFactor: number;
}

async function withRetry<T>(
  fn: () => Promise<T>,
  options: RetryOptions = {
    maxRetries: 3,
    initialDelay: 1000,
    maxDelay: 30000,
    backoffFactor: 2,
  }
): Promise<T> {
  let lastError: Error | undefined;
  let delay = options.initialDelay;

  for (let attempt = 0; attempt <= options.maxRetries; attempt++) {
    try {
      return await fn();
    } catch (error) {
      lastError = error as Error;
      
      if (attempt === options.maxRetries) break;

      // リトライ可能なエラーか判定
      if (!isRetryableError(error)) {
        throw error;
      }

      console.warn(
        `Attempt ${attempt + 1} failed, retrying in ${delay}ms:`,
        (error as Error).message
      );

      await new Promise((r) => setTimeout(r, delay));
      delay = Math.min(delay * options.backoffFactor, options.maxDelay);
    }
  }

  throw lastError;
}

function isRetryableError(error: any): boolean {
  // ネットワークエラー、一時的なサーバーエラー
  if (error.code === 'ECONNRESET' || error.code === 'ETIMEDOUT') return true;
  if (error.status >= 500 && error.status < 600) return true;
  if (error.status === 429) return true; // Rate limit
  return false;
}

デッドレターキューの実装

// 処理失敗したレコードをデッドレターキューに送信
async function sendToDeadLetterQueue(
  record: any,
  error: Error,
  taskIndex: number
) {
  const { PubSub } = require('@google-cloud/pubsub');
  const pubsub = new PubSub();

  const topic = pubsub.topic('dead-letter-queue');
  await topic.publishMessage({
    data: Buffer.from(JSON.stringify({
      record,
      error: {
        message: error.message,
        stack: error.stack,
      },
      metadata: {
        taskIndex,
        timestamp: new Date().toISOString(),
        jobName: process.env.CLOUD_RUN_JOB || 'unknown',
        attempt: parseInt(process.env.CLOUD_RUN_TASK_ATTEMPT || '0'),
      },
    })),
  });

  console.log(`Record ${record.id} sent to dead letter queue`);
}

監視とアラート

Cloud Monitoringの設定

# monitoring/alert-policy.yaml
displayName: "Cloud Run Job Failure Alert"
conditions:
  - displayName: "Job execution failed"
    conditionThreshold:
      filter: |
        resource.type = "cloud_run_job"
        AND metric.type = "run.googleapis.com/job/completed_task_attempt_count"
        AND metric.labels.result = "failed"
      comparison: COMPARISON_GT
      thresholdValue: 0
      duration: "0s"
      aggregations:
        - alignmentPeriod: "300s"
          perSeriesAligner: ALIGN_SUM
notificationChannels:
  - projects/PROJECT_ID/notificationChannels/CHANNEL_ID
alertStrategy:
  autoClose: "1800s"

ログベースのモニタリング

// src/utils/logger.ts
import { Logging } from '@google-cloud/logging';

const logging = new Logging();
const log = logging.log('batch-processor');

interface JobMetrics {
  jobName: string;
  taskIndex: number;
  recordsProcessed: number;
  recordsFailed: number;
  durationMs: number;
  status: 'success' | 'partial_failure' | 'failure';
}

async function logJobMetrics(metrics: JobMetrics) {
  const entry = log.entry(
    {
      resource: {
        type: 'cloud_run_job',
        labels: {
          job_name: metrics.jobName,
          location: 'asia-northeast1',
        },
      },
      severity: metrics.status === 'failure' ? 'ERROR' : 'INFO',
    },
    {
      message: `Job ${metrics.jobName} task ${metrics.taskIndex}: ${metrics.status}`,
      ...metrics,
    }
  );

  await log.write(entry);
}

コスト最適化

料金体系の理解

Cloud Run Jobsの料金は、タスク実行中のリソース使用量に基づきます。

リソース料金(asia-northeast1)無料枠
vCPU$0.00002400/vCPU秒180,000 vCPU秒/月
メモリ$0.00000250/GiB秒360,000 GiB秒/月
ネットワーク$0.12/GB(外部)1GB/月

コスト試算例:

日次バッチ処理(30日/月)
- 10タスク並列、各タスク10分
- CPU: 2 vCPU、メモリ: 2GiB

vCPU: 2 × 600秒 × 10タスク × 30日 = 360,000 vCPU秒
→ 360,000 × $0.0000240 = $8.64/月

メモリ: 2 × 600秒 × 10タスク × 30日 = 360,000 GiB秒
→ 360,000 × $0.0000025 = $0.90/月

合計: 約$9.54/月(約1,430円)

コスト削減のテクニック

テクニック効果実装難易度
タスク数の最適化並列度を下げてコスト削減★☆☆
処理の効率化実行時間短縮★★☆
リソース最適化CPU/メモリの適正化★☆☆
バッチ統合小さなジョブを統合★★☆
無料枠の活用月間無料枠内に収める★☆☆

SES現場での活用パターン

Google Cloud案件の動向

2026年のSES市場では、Google Cloudエンジニアの需要が急速に伸びています。

ポジション月単価相場必要スキル
GCPバッチ処理エンジニア65-85万円Cloud Run Jobs, BigQuery, GCS
GCPデータエンジニア70-95万円Dataflow, BigQuery, Pub/Sub
GCPインフラエンジニア70-90万円Terraform, GKE, Cloud Run
GCPフルスタック75-100万円Cloud Run, Firebase, BigQuery

Cloud Run Jobsの実務活用例

金融データ処理:

用途: 日次の取引データ集計・レポート生成
構成: Cloud Scheduler → Cloud Run Jobs (20並列) → BigQuery
処理量: 500万レコード/日
実行時間: 15分(並列処理)
コスト: 約$15/月

Eコマースデータ同期:

用途: 商品・在庫データの基幹システム連携
構成: Eventarc → Cloud Run Jobs → Firestore/BigQuery
処理量: 10万商品/回
実行頻度: 1時間ごと
コスト: 約$25/月

まとめ:Cloud Run Jobsでバッチ処理を最適化する

Cloud Run Jobsは、サーバーレスでスケーラブルなバッチ処理を実現する強力なサービスです。

本記事のポイント:

  • サーバー管理不要で最大10,000タスクの並列バッチ処理を実行可能
  • Cloud Scheduler/Eventarcとの連携で定期実行・イベント駆動を実現
  • リトライ・デッドレターキュー・監視の実装で堅牢なバッチパイプラインを構築
  • SES案件でのGoogle Cloud需要拡大に対応するスキルアップに最適

Google Cloud Run Jobsのスキルを身につけて、クラウドエンジニアとしての市場価値を高めましょう。


関連記事

SES案件をお探しですか?

SES記事をもっと読む →
🏗️

SES BASE 編集長

SES業界歴10年以上のメンバーが在籍する編集チーム。SES企業での営業・エンジニア経験、フリーランス独立経験を持つメンバーが、業界のリアルな情報をお届けします。

📊 業界データに基づく記事制作 🔍 IPA・経済産業省データ参照 💼 SES実務経験者が執筆・監修