「バッチ処理のためだけにサーバーを常時稼働させるのはコストがもったいない」「Kubernetes上のCronJobの運用が複雑すぎる」——バッチ処理の効率的な実行は、多くの開発チームが直面する課題です。
Cloud Run Jobsは、サーバーレスでコンテナベースのバッチ処理を実行できるGoogle Cloudのサービスです。サーバー管理不要、使った分だけの課金、最大10,000タスクの並列実行が可能で、バッチ処理の運用コストを大幅に削減できます。
- Cloud Run Jobsはサーバーレスでコンテナバッチ処理を実行、サーバー管理不要
- スケジュール実行・並列処理・リトライ機能で堅牢なバッチパイプラインを構築
- SES案件でのGoogle Cloud需要が急増中、Cloud Run Jobsスキルが差別化要因に
Cloud Run Jobsとは

Cloud Run Jobsは、Google Cloud Runの一機能として提供されるバッチ処理実行環境です。従来のCloud Run(サービス)がHTTPリクエストを処理するのに対し、Jobsは完了するまで実行するタスクに特化しています。
Cloud Run Service vs Cloud Run Jobs
| 特徴 | Cloud Run Service | Cloud Run Jobs |
|---|---|---|
| トリガー | HTTPリクエスト | 手動/スケジュール/API |
| 実行モデル | リクエスト駆動 | 完了まで実行 |
| 最大実行時間 | 60分(デフォルト) | 24時間 |
| 並列実行 | オートスケール | 最大10,000タスク |
| 課金 | リクエスト処理中 | タスク実行中 |
| ユースケース | Web API、Webhook | データ処理、ETL、レポート |
Cloud Run Jobsの主なユースケース
- データ処理・ETL — 大量データの変換・集計・移行
- レポート生成 — 日次/月次レポートの自動生成
- バックアップ — データベースバックアップの定期実行
- メール配信 — 大量メールの一括送信
- 機械学習 — モデルの推論バッチ処理
- ファイル処理 — 画像変換、PDF生成、CSVインポート
- Cloud Run Jobsの基本概念と設定方法
- バッチ処理の設計パターンと並列実行テクニック
- スケジュール実行・エラーハンドリング・監視の設定
- SES案件でのGoogle Cloud需要とキャリアへの影響
Cloud Run Jobsの基本設定
ジョブの作成
CLIでの作成:
# ジョブの作成
gcloud run jobs create data-processor \
--image=asia-northeast1-docker.pkg.dev/PROJECT_ID/repo/data-processor:latest \
--region=asia-northeast1 \
--tasks=10 \
--max-retries=3 \
--task-timeout=3600 \
--memory=2Gi \
--cpu=2 \
--set-env-vars="ENV=production,DB_HOST=10.0.0.1" \
--set-secrets="DB_PASSWORD=db-password:latest" \
--service-account=batch-processor@PROJECT_ID.iam.gserviceaccount.com \
--vpc-connector=batch-vpc-connector
Terraformでの定義:
resource "google_cloud_run_v2_job" "data_processor" {
name = "data-processor"
location = "asia-northeast1"
template {
task_count = 10
parallelism = 5
template {
timeout = "3600s"
max_retries = 3
containers {
image = "asia-northeast1-docker.pkg.dev/${var.project_id}/repo/data-processor:latest"
resources {
limits = {
cpu = "2"
memory = "2Gi"
}
}
env {
name = "ENV"
value = "production"
}
env {
name = "DB_PASSWORD"
value_source {
secret_key_ref {
secret = google_secret_manager_secret.db_password.secret_id
version = "latest"
}
}
}
}
service_account = google_service_account.batch_processor.email
vpc_access {
connector = google_vpc_access_connector.batch_connector.id
egress = "PRIVATE_RANGES_ONLY"
}
}
}
}
コンテナイメージの構築
バッチ処理用のDockerイメージを構築します。
# Dockerfile
FROM node:20-slim AS builder
WORKDIR /app
COPY package*.json ./
RUN npm ci --production=false
COPY . .
RUN npm run build
FROM node:20-slim
WORKDIR /app
COPY --from=builder /app/dist ./dist
COPY --from=builder /app/node_modules ./node_modules
COPY --from=builder /app/package.json ./
# Cloud Run Jobsは環境変数でタスクインデックスを渡す
# CLOUD_RUN_TASK_INDEX: 0から始まるタスクインデックス
# CLOUD_RUN_TASK_COUNT: タスクの総数
# CLOUD_RUN_TASK_ATTEMPT: リトライ回数
CMD ["node", "dist/main.js"]
バッチ処理の設計パターン
パターン1: データ分割並列処理
大量データを複数タスクで分割処理する最も基本的なパターンです。
// src/main.ts - タスク分割処理
import { BigQuery } from '@google-cloud/bigquery';
import { Storage } from '@google-cloud/storage';
const TASK_INDEX = parseInt(process.env.CLOUD_RUN_TASK_INDEX || '0');
const TASK_COUNT = parseInt(process.env.CLOUD_RUN_TASK_COUNT || '1');
const ATTEMPT = parseInt(process.env.CLOUD_RUN_TASK_ATTEMPT || '0');
async function main() {
console.log(`Task ${TASK_INDEX}/${TASK_COUNT} started (attempt: ${ATTEMPT})`);
const bigquery = new BigQuery();
const storage = new Storage();
// データの範囲を計算(タスクインデックスに基づく分割)
const totalRecords = await getTotalRecordCount(bigquery);
const chunkSize = Math.ceil(totalRecords / TASK_COUNT);
const offset = TASK_INDEX * chunkSize;
const limit = Math.min(chunkSize, totalRecords - offset);
console.log(`Processing records ${offset} to ${offset + limit}`);
// データ取得と処理
const query = `
SELECT * FROM \`project.dataset.source_table\`
ORDER BY id
LIMIT ${limit} OFFSET ${offset}
`;
const [rows] = await bigquery.query({ query });
// データ変換処理
const processedData = rows.map((row: any) => transformRecord(row));
// 結果をGCSに出力
const bucket = storage.bucket('output-bucket');
const file = bucket.file(`output/part-${TASK_INDEX.toString().padStart(5, '0')}.jsonl`);
const content = processedData.map((d: any) => JSON.stringify(d)).join('\n');
await file.save(content, { contentType: 'application/jsonl' });
console.log(`Task ${TASK_INDEX} completed: ${processedData.length} records processed`);
}
function transformRecord(record: any) {
return {
id: record.id,
name: record.name?.trim(),
email: record.email?.toLowerCase(),
processedAt: new Date().toISOString(),
taskIndex: TASK_INDEX,
};
}
async function getTotalRecordCount(bigquery: BigQuery): Promise<number> {
const [result] = await bigquery.query({
query: 'SELECT COUNT(*) as count FROM `project.dataset.source_table`',
});
return result[0].count;
}
main().catch((error) => {
console.error(`Task ${TASK_INDEX} failed:`, error);
process.exit(1);
});
パターン2: キュー駆動バッチ処理
Cloud Tasksと組み合わせて、キュー駆動のバッチ処理を実現します。
// src/queue-processor.ts
import { CloudTasksClient } from '@google-cloud/tasks';
import { Firestore } from '@google-cloud/firestore';
const db = new Firestore();
const TASK_INDEX = parseInt(process.env.CLOUD_RUN_TASK_INDEX || '0');
async function processQueue() {
console.log(`Queue processor ${TASK_INDEX} started`);
// ジョブキューからタスクを取得(排他制御)
while (true) {
const task = await claimNextTask();
if (!task) {
console.log('No more tasks in queue');
break;
}
try {
await processTask(task);
await markTaskComplete(task.id);
console.log(`Task ${task.id} completed`);
} catch (error) {
await markTaskFailed(task.id, error as Error);
console.error(`Task ${task.id} failed:`, error);
}
}
console.log(`Queue processor ${TASK_INDEX} finished`);
}
async function claimNextTask() {
return db.runTransaction(async (tx) => {
const snapshot = await tx
.collection('job-queue')
.where('status', '==', 'pending')
.orderBy('priority', 'desc')
.limit(1)
.get();
if (snapshot.empty) return null;
const doc = snapshot.docs[0];
tx.update(doc.ref, {
status: 'processing',
workerIndex: TASK_INDEX,
startedAt: new Date(),
});
return { id: doc.id, ...doc.data() };
});
}
async function processTask(task: any) {
// タスクの種類に応じた処理
switch (task.type) {
case 'image-resize':
await resizeImage(task.payload);
break;
case 'pdf-generate':
await generatePdf(task.payload);
break;
case 'email-send':
await sendEmail(task.payload);
break;
default:
throw new Error(`Unknown task type: ${task.type}`);
}
}
パターン3: ETLパイプライン
BigQuery → 変換 → Cloud Storage → BigQueryのETLパイプラインを構築します。
// src/etl-pipeline.ts
import { BigQuery } from '@google-cloud/bigquery';
import { Storage } from '@google-cloud/storage';
interface ETLConfig {
sourceQuery: string;
outputBucket: string;
outputPrefix: string;
destinationTable: string;
}
async function runETL(config: ETLConfig) {
const bigquery = new BigQuery();
const storage = new Storage();
const taskIndex = parseInt(process.env.CLOUD_RUN_TASK_INDEX || '0');
console.log(`[ETL] Task ${taskIndex}: Extract phase started`);
// Extract: BigQueryからデータ取得
const [extractJob] = await bigquery.createQueryJob({
query: config.sourceQuery,
location: 'asia-northeast1',
});
const [rows] = await extractJob.getQueryResults();
console.log(`[ETL] Task ${taskIndex}: ${rows.length} rows extracted`);
// Transform: データ変換
const transformed = rows
.map((row: any) => {
try {
return {
...row,
processed_date: new Date().toISOString().split('T')[0],
revenue_jpy: Math.round(row.revenue_usd * 150),
category_normalized: normalizeCategory(row.category),
is_valid: validateRecord(row),
};
} catch (e) {
console.warn(`Transform error for row ${row.id}:`, e);
return null;
}
})
.filter(Boolean);
console.log(`[ETL] Task ${taskIndex}: ${transformed.length} rows transformed`);
// Load: 結果をCloud Storageに出力
const outputPath = `${config.outputPrefix}/part-${taskIndex}.jsonl`;
const bucket = storage.bucket(config.outputBucket);
const file = bucket.file(outputPath);
await file.save(
transformed.map((r: any) => JSON.stringify(r)).join('\n'),
{ contentType: 'application/jsonl' }
);
// BigQueryにロード
const [loadJob] = await bigquery
.dataset('analytics')
.table(config.destinationTable)
.load(file, {
sourceFormat: 'NEWLINE_DELIMITED_JSON',
writeDisposition: 'WRITE_APPEND',
autodetect: false,
});
console.log(`[ETL] Task ${taskIndex}: Load complete, ${loadJob.status?.statistics?.load?.outputRows} rows loaded`);
}
スケジュール実行の設定
Cloud Schedulerとの連携
定期的なバッチ処理をCloud Schedulerで自動実行します。
# 毎日午前2時(JST)に実行
gcloud scheduler jobs create http daily-data-sync \
--location=asia-northeast1 \
--schedule="0 2 * * *" \
--time-zone="Asia/Tokyo" \
--uri="https://asia-northeast1-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/PROJECT_ID/jobs/data-processor:run" \
--http-method=POST \
--oauth-service-account-email=scheduler-sa@PROJECT_ID.iam.gserviceaccount.com
Terraform設定:
resource "google_cloud_scheduler_job" "daily_batch" {
name = "daily-data-sync"
region = "asia-northeast1"
schedule = "0 2 * * *"
time_zone = "Asia/Tokyo"
http_target {
http_method = "POST"
uri = "https://asia-northeast1-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/${var.project_id}/jobs/${google_cloud_run_v2_job.data_processor.name}:run"
oauth_token {
service_account_email = google_service_account.scheduler.email
}
}
retry_config {
retry_count = 3
min_backoff_duration = "30s"
max_backoff_duration = "300s"
}
}
Eventarcによるイベント駆動実行
GCSへのファイルアップロードをトリガーにジョブを実行します。
# GCSのファイル作成をトリガーに実行
gcloud eventarc triggers create csv-upload-trigger \
--location=asia-northeast1 \
--destination-run-job=csv-processor \
--destination-run-region=asia-northeast1 \
--event-filters="type=google.cloud.storage.object.v1.finalized" \
--event-filters="bucket=input-csv-bucket" \
--service-account=eventarc-sa@PROJECT_ID.iam.gserviceaccount.com
エラーハンドリングとリトライ
リトライ戦略の設計
// src/utils/retry.ts
interface RetryOptions {
maxRetries: number;
initialDelay: number;
maxDelay: number;
backoffFactor: number;
}
async function withRetry<T>(
fn: () => Promise<T>,
options: RetryOptions = {
maxRetries: 3,
initialDelay: 1000,
maxDelay: 30000,
backoffFactor: 2,
}
): Promise<T> {
let lastError: Error | undefined;
let delay = options.initialDelay;
for (let attempt = 0; attempt <= options.maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error as Error;
if (attempt === options.maxRetries) break;
// リトライ可能なエラーか判定
if (!isRetryableError(error)) {
throw error;
}
console.warn(
`Attempt ${attempt + 1} failed, retrying in ${delay}ms:`,
(error as Error).message
);
await new Promise((r) => setTimeout(r, delay));
delay = Math.min(delay * options.backoffFactor, options.maxDelay);
}
}
throw lastError;
}
function isRetryableError(error: any): boolean {
// ネットワークエラー、一時的なサーバーエラー
if (error.code === 'ECONNRESET' || error.code === 'ETIMEDOUT') return true;
if (error.status >= 500 && error.status < 600) return true;
if (error.status === 429) return true; // Rate limit
return false;
}
デッドレターキューの実装
// 処理失敗したレコードをデッドレターキューに送信
async function sendToDeadLetterQueue(
record: any,
error: Error,
taskIndex: number
) {
const { PubSub } = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('dead-letter-queue');
await topic.publishMessage({
data: Buffer.from(JSON.stringify({
record,
error: {
message: error.message,
stack: error.stack,
},
metadata: {
taskIndex,
timestamp: new Date().toISOString(),
jobName: process.env.CLOUD_RUN_JOB || 'unknown',
attempt: parseInt(process.env.CLOUD_RUN_TASK_ATTEMPT || '0'),
},
})),
});
console.log(`Record ${record.id} sent to dead letter queue`);
}
監視とアラート
Cloud Monitoringの設定
# monitoring/alert-policy.yaml
displayName: "Cloud Run Job Failure Alert"
conditions:
- displayName: "Job execution failed"
conditionThreshold:
filter: |
resource.type = "cloud_run_job"
AND metric.type = "run.googleapis.com/job/completed_task_attempt_count"
AND metric.labels.result = "failed"
comparison: COMPARISON_GT
thresholdValue: 0
duration: "0s"
aggregations:
- alignmentPeriod: "300s"
perSeriesAligner: ALIGN_SUM
notificationChannels:
- projects/PROJECT_ID/notificationChannels/CHANNEL_ID
alertStrategy:
autoClose: "1800s"
ログベースのモニタリング
// src/utils/logger.ts
import { Logging } from '@google-cloud/logging';
const logging = new Logging();
const log = logging.log('batch-processor');
interface JobMetrics {
jobName: string;
taskIndex: number;
recordsProcessed: number;
recordsFailed: number;
durationMs: number;
status: 'success' | 'partial_failure' | 'failure';
}
async function logJobMetrics(metrics: JobMetrics) {
const entry = log.entry(
{
resource: {
type: 'cloud_run_job',
labels: {
job_name: metrics.jobName,
location: 'asia-northeast1',
},
},
severity: metrics.status === 'failure' ? 'ERROR' : 'INFO',
},
{
message: `Job ${metrics.jobName} task ${metrics.taskIndex}: ${metrics.status}`,
...metrics,
}
);
await log.write(entry);
}
コスト最適化
料金体系の理解
Cloud Run Jobsの料金は、タスク実行中のリソース使用量に基づきます。
| リソース | 料金(asia-northeast1) | 無料枠 |
|---|---|---|
| vCPU | $0.00002400/vCPU秒 | 180,000 vCPU秒/月 |
| メモリ | $0.00000250/GiB秒 | 360,000 GiB秒/月 |
| ネットワーク | $0.12/GB(外部) | 1GB/月 |
コスト試算例:
日次バッチ処理(30日/月)
- 10タスク並列、各タスク10分
- CPU: 2 vCPU、メモリ: 2GiB
vCPU: 2 × 600秒 × 10タスク × 30日 = 360,000 vCPU秒
→ 360,000 × $0.0000240 = $8.64/月
メモリ: 2 × 600秒 × 10タスク × 30日 = 360,000 GiB秒
→ 360,000 × $0.0000025 = $0.90/月
合計: 約$9.54/月(約1,430円)
コスト削減のテクニック
| テクニック | 効果 | 実装難易度 |
|---|---|---|
| タスク数の最適化 | 並列度を下げてコスト削減 | ★☆☆ |
| 処理の効率化 | 実行時間短縮 | ★★☆ |
| リソース最適化 | CPU/メモリの適正化 | ★☆☆ |
| バッチ統合 | 小さなジョブを統合 | ★★☆ |
| 無料枠の活用 | 月間無料枠内に収める | ★☆☆ |
SES現場での活用パターン
Google Cloud案件の動向
2026年のSES市場では、Google Cloudエンジニアの需要が急速に伸びています。
| ポジション | 月単価相場 | 必要スキル |
|---|---|---|
| GCPバッチ処理エンジニア | 65-85万円 | Cloud Run Jobs, BigQuery, GCS |
| GCPデータエンジニア | 70-95万円 | Dataflow, BigQuery, Pub/Sub |
| GCPインフラエンジニア | 70-90万円 | Terraform, GKE, Cloud Run |
| GCPフルスタック | 75-100万円 | Cloud Run, Firebase, BigQuery |
Cloud Run Jobsの実務活用例
金融データ処理:
用途: 日次の取引データ集計・レポート生成
構成: Cloud Scheduler → Cloud Run Jobs (20並列) → BigQuery
処理量: 500万レコード/日
実行時間: 15分(並列処理)
コスト: 約$15/月
Eコマースデータ同期:
用途: 商品・在庫データの基幹システム連携
構成: Eventarc → Cloud Run Jobs → Firestore/BigQuery
処理量: 10万商品/回
実行頻度: 1時間ごと
コスト: 約$25/月
まとめ:Cloud Run Jobsでバッチ処理を最適化する
Cloud Run Jobsは、サーバーレスでスケーラブルなバッチ処理を実現する強力なサービスです。
本記事のポイント:
- サーバー管理不要で最大10,000タスクの並列バッチ処理を実行可能
- Cloud Scheduler/Eventarcとの連携で定期実行・イベント駆動を実現
- リトライ・デッドレターキュー・監視の実装で堅牢なバッチパイプラインを構築
- SES案件でのGoogle Cloud需要拡大に対応するスキルアップに最適
Google Cloud Run Jobsのスキルを身につけて、クラウドエンジニアとしての市場価値を高めましょう。