「データの取り込みから加工・集約まで、パイプライン構築に毎回膨大な時間がかかる」——データエンジニアリングに携わるSESエンジニアなら、一度は感じたことがある悩みでしょう。
結論から言えば、Claude Codeを活用することでETL設計からバッチ処理、リアルタイムストリーミングまで、データパイプライン構築の工数を大幅に短縮できます。本記事では、実際の開発フローに沿って具体的なプロンプト例とともに解説します。
この記事を3秒でまとめると
- Claude CodeはETL設計・スキーマ定義・データ変換ロジックの生成に強い
- CLAUDE.mdにデータソース情報を記述することでコンテキスト精度が上がる
- バッチ処理からリアルタイムストリーミングまで一貫してAIで効率化できる

データパイプライン構築でClaude Codeが威力を発揮する理由
データパイプラインの構築は、単にコードを書くだけでは終わりません。データソースの特性理解、スキーマ設計、変換ロジック、エラーハンドリング、リトライ戦略——多くの要素が複雑に絡み合います。
Claude Codeがこの領域で力を発揮する理由は、プロジェクト全体を俯瞰してコードを生成できる点にあります。CSVファイルの構造、DBテーブルの定義、APIレスポンスのスキーマをすべて把握した上で、適切な変換処理を提案してくれるのです。
Claude Codeが得意なデータパイプラインタスク
| タスク | 具体的な活用法 | 効率化の度合い |
|---|---|---|
| ETL設計 | 抽出・変換・ロードの各ステップのコード生成 | 工数70%削減 |
| スキーマ定義 | データソースからのスキーマ自動推論 | 工数60%削減 |
| バッチ処理 | cron/スケジューラ連携のジョブ設計 | 工数50%削減 |
| データ品質チェック | バリデーションルールの自動生成 | 工数80%削減 |
| エラーハンドリング | リトライ・デッドレターキュー設計 | 工数55%削減 |
なぜSESエンジニアにデータパイプラインスキルが求められるのか
SES市場において、データエンジニアリング案件の需要は年々増加しています。従来の「アプリケーション開発」中心の案件から、データ基盤構築・データ分析基盤の整備を求める案件が急増しているのです。
具体的には以下のような案件が増えています。
- データレイク/データウェアハウス構築: S3 + Glue + Redshift、BigQuery連携
- リアルタイムデータ処理: Kafka + Flink、Kinesis + Lambda
- MLパイプライン: 特徴量エンジニアリング + モデル学習パイプライン
- データ品質管理: Great Expectations、dbt testの導入
Claude Codeを使いこなせれば、こうした案件への参画もスムーズになります。
CLAUDE.mdでデータパイプラインの文脈を伝える
Claude Codeの精度を最大限に引き出すには、CLAUDE.mdファイルにプロジェクトの文脈を適切に記述することが重要です。データパイプラインプロジェクトでは、以下の情報を含めると効果的です。
# CLAUDE.md - データパイプラインプロジェクト
## プロジェクト概要
ECサイトの注文データを集約するETLパイプライン
## データソース
- PostgreSQL (注文DB): orders, order_items, customers テーブル
- REST API (在庫管理): GET /api/v1/inventory
- S3バケット (CSVアップロード): s3://raw-data/daily-reports/
## 出力先
- BigQuery: analytics.orders_fact, analytics.customers_dim
- Redshift: warehouse.daily_summary
## 技術スタック
- Python 3.12 + Apache Airflow 2.8
- pandas / polars でデータ変換
- SQLAlchemy でDB接続
- boto3 でAWS連携
## コーディング規約
- 型ヒント必須 (mypy strict)
- docstring は Google style
- テストは pytest + fixtures
この情報があるだけで、Claude Codeの提案精度が格段に向上します。「orders テーブルから月次集計を出して」と指示するだけで、適切なJOINとGROUP BYを含むクエリを生成してくれるようになります。
ETLパイプラインの設計をClaude Codeで効率化する
Extract(抽出)フェーズ
データ抽出では、複数のソースからデータを取得するコードを書く必要があります。Claude Codeを使えば、各ソースに応じた接続・取得ロジックを素早く生成できます。
プロンプト例:
PostgreSQLのordersテーブルから前日分のデータを抽出するExtractorクラスを作成して。
要件:
- SQLAlchemy使用
- 日付範囲指定で抽出
- バッチサイズ10,000件ずつ取得
- 接続エラー時は3回リトライ
- 抽出件数をログ出力
Claude Codeはこのプロンプトから、以下のような堅牢な抽出コードを生成します。
import logging
from datetime import date, timedelta
from typing import Iterator
import pandas as pd
from sqlalchemy import create_engine, text
from tenacity import retry, stop_after_attempt, wait_exponential
logger = logging.getLogger(__name__)
class OrderExtractor:
"""注文データの抽出を担当するクラス"""
def __init__(self, connection_url: str, batch_size: int = 10_000) -> None:
self.engine = create_engine(connection_url)
self.batch_size = batch_size
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30),
)
def extract(
self,
start_date: date | None = None,
end_date: date | None = None,
) -> Iterator[pd.DataFrame]:
"""指定日付範囲の注文データをバッチで抽出する"""
if start_date is None:
start_date = date.today() - timedelta(days=1)
if end_date is None:
end_date = date.today()
query = text("""
SELECT o.id, o.customer_id, o.total_amount,
o.status, o.created_at
FROM orders o
WHERE o.created_at >= :start_date
AND o.created_at < :end_date
ORDER BY o.created_at
""")
total_rows = 0
with self.engine.connect() as conn:
for chunk in pd.read_sql(
query,
conn,
params={"start_date": start_date, "end_date": end_date},
chunksize=self.batch_size,
):
total_rows += len(chunk)
logger.info(f"抽出済み: {total_rows}件")
yield chunk
logger.info(f"抽出完了: 合計{total_rows}件")
Transform(変換)フェーズ
データ変換は最も複雑になりやすいフェーズです。Claude Codeに変換ルールを伝えることで、正確な変換ロジックを生成できます。
プロンプト例:
注文データの変換処理を作成して。
変換ルール:
1. 金額をJPYからUSDに変換(為替レートはAPIから取得)
2. ステータスを英語から日本語にマッピング(pending→未処理, completed→完了, cancelled→キャンセル)
3. 顧客IDから顧客セグメント(新規/リピーター/VIP)を判定
4. 日付カラムからyear, month, day_of_weekを抽出
5. null値のデフォルト処理
polarsを使って高速に処理すること。
Claude Codeは型安全で高速なpolarsベースの変換コードを生成してくれます。
import polars as pl
from enum import Enum
class CustomerSegment(str, Enum):
NEW = "新規"
REPEAT = "リピーター"
VIP = "VIP"
STATUS_MAP = {
"pending": "未処理",
"completed": "完了",
"cancelled": "キャンセル",
"processing": "処理中",
"refunded": "返金済み",
}
def transform_orders(
df: pl.DataFrame,
exchange_rate: float,
customer_history: pl.DataFrame,
) -> pl.DataFrame:
"""注文データの変換処理"""
return (
df.with_columns([
# 通貨変換
(pl.col("total_amount") / exchange_rate)
.round(2)
.alias("total_amount_usd"),
# ステータスマッピング
pl.col("status")
.replace(STATUS_MAP)
.alias("status_ja"),
# 日付分解
pl.col("created_at").dt.year().alias("year"),
pl.col("created_at").dt.month().alias("month"),
pl.col("created_at").dt.weekday().alias("day_of_week"),
])
# 顧客セグメント判定
.join(
customer_history.select(["customer_id", "order_count"]),
on="customer_id",
how="left",
)
.with_columns(
pl.when(pl.col("order_count").is_null() | (pl.col("order_count") == 0))
.then(pl.lit(CustomerSegment.NEW.value))
.when(pl.col("order_count") >= 10)
.then(pl.lit(CustomerSegment.VIP.value))
.otherwise(pl.lit(CustomerSegment.REPEAT.value))
.alias("customer_segment")
)
# null値処理
.with_columns([
pl.col("total_amount").fill_null(0),
pl.col("status_ja").fill_null("不明"),
])
)
Load(ロード)フェーズ
変換済みデータの書き込みもClaude Codeで効率化できます。特にBigQueryやRedshiftへのバルクロードは、適切なパーティショニングやupsert処理が求められますが、Claude Codeなら一発で生成可能です。
プロンプト例:
BigQueryへのデータロード処理を作成して。
要件:
- 日付パーティショニング(created_at)
- WRITE_TRUNCATE(日次で洗い替え)
- スキーマ自動検出
- ロード結果のログ出力
- エラー時のロールバック対応
from google.cloud import bigquery
from google.api_core.exceptions import GoogleAPIError
import logging
logger = logging.getLogger(__name__)
class BigQueryLoader:
"""BigQueryへのデータロードを担当"""
def __init__(self, project_id: str, dataset: str) -> None:
self.client = bigquery.Client(project=project_id)
self.dataset = dataset
def load_orders(
self,
df: "pd.DataFrame",
table_name: str = "orders_fact",
partition_date: str | None = None,
) -> bigquery.LoadJob:
"""注文データをBigQueryにロード"""
table_ref = f"{self.client.project}.{self.dataset}.{table_name}"
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
time_partitioning=bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="created_at",
),
autodetect=True,
)
if partition_date:
table_ref = f"{table_ref}${partition_date}"
try:
job = self.client.load_table_from_dataframe(
df, table_ref, job_config=job_config
)
job.result() # 完了まで待機
logger.info(
f"ロード完了: {table_ref} ({job.output_rows}行)"
)
return job
except GoogleAPIError as e:
logger.error(f"BigQueryロードエラー: {e}")
raise
Apache Airflow DAGをClaude Codeで生成する
ETLの各コンポーネントを組み合わせて、Airflow DAGとしてオーケストレーションする構成をClaude Codeに任せることも可能です。
プロンプト例:
以下のETLフローをAirflow DAGとして実装して。
1. PostgreSQLから注文データ抽出(日次)
2. 為替レートAPI取得
3. データ変換(上記のtransform_orders使用)
4. BigQueryロード
5. データ品質チェック(件数、null率)
6. Slack通知(成功/失敗)
スケジュール: 毎日JST 6:00
リトライ: 最大3回、5分間隔
SLA: 30分以内
Claude Codeが生成するDAGは、本番運用を考慮したものになります。
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.trigger_rule import TriggerRule
default_args = {
"owner": "data-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["[email protected]"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
"sla": timedelta(minutes=30),
}
with DAG(
dag_id="daily_orders_etl",
default_args=default_args,
description="日次注文データETLパイプライン",
schedule_interval="0 21 * * *", # UTC 21:00 = JST 6:00
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["etl", "orders", "daily"],
) as dag:
extract = PythonOperator(
task_id="extract_orders",
python_callable=extract_orders_task,
)
fetch_rate = PythonOperator(
task_id="fetch_exchange_rate",
python_callable=fetch_exchange_rate_task,
)
transform = PythonOperator(
task_id="transform_orders",
python_callable=transform_orders_task,
)
load = PythonOperator(
task_id="load_to_bigquery",
python_callable=load_to_bigquery_task,
)
quality_check = PythonOperator(
task_id="data_quality_check",
python_callable=quality_check_task,
)
notify_success = SlackWebhookOperator(
task_id="notify_success",
slack_webhook_conn_id="slack_webhook",
message="✅ 日次注文ETL完了",
trigger_rule=TriggerRule.ALL_SUCCESS,
)
notify_failure = SlackWebhookOperator(
task_id="notify_failure",
slack_webhook_conn_id="slack_webhook",
message="❌ 日次注文ETLが失敗しました",
trigger_rule=TriggerRule.ONE_FAILED,
)
[extract, fetch_rate] >> transform >> load >> quality_check
quality_check >> [notify_success, notify_failure]
リアルタイムストリーミングパイプラインの構築
バッチ処理だけでなく、リアルタイムのデータストリーミングもClaude Codeでサポートできます。Kafka ConsumerやKinesis Data Streamsとの連携コードを効率的に生成しましょう。
Kafkaストリーミング処理
プロンプト例:
Kafkaからリアルタイムで注文イベントを消費し、
集約結果をRedisに書き込むストリーミング処理を作成して。
- consumer group: order-aggregator
- トピック: order-events
- 5分ウィンドウで売上集計
- exactly-once semanticsを保証
- Graceful shutdown対応
import signal
import json
from datetime import datetime, timedelta
from collections import defaultdict
from confluent_kafka import Consumer, KafkaError
import redis
class OrderStreamAggregator:
"""リアルタイム注文集約処理"""
def __init__(
self,
kafka_config: dict,
redis_url: str,
window_minutes: int = 5,
) -> None:
self.consumer = Consumer({
**kafka_config,
"group.id": "order-aggregator",
"auto.offset.reset": "latest",
"enable.auto.commit": False,
})
self.redis = redis.from_url(redis_url)
self.window_minutes = window_minutes
self.running = True
self.window_data: dict[str, float] = defaultdict(float)
signal.signal(signal.SIGTERM, self._shutdown)
signal.signal(signal.SIGINT, self._shutdown)
def _shutdown(self, signum, frame) -> None:
self.running = False
def _get_window_key(self, timestamp: datetime) -> str:
window_start = timestamp.replace(
minute=(timestamp.minute // self.window_minutes)
* self.window_minutes,
second=0,
microsecond=0,
)
return f"sales:window:{window_start.isoformat()}"
def run(self) -> None:
self.consumer.subscribe(["order-events"])
while self.running:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
raise Exception(f"Kafka error: {msg.error()}")
continue
event = json.loads(msg.value().decode("utf-8"))
timestamp = datetime.fromisoformat(event["timestamp"])
window_key = self._get_window_key(timestamp)
# Redis上でアトミックに集計
pipe = self.redis.pipeline()
pipe.incrbyfloat(window_key, event["amount"])
pipe.expire(window_key, 3600) # 1時間でTTL
pipe.execute()
self.consumer.commit(asynchronous=False)
self.consumer.close()
データ品質チェックの自動化
パイプラインの信頼性を担保するには、データ品質チェックが不可欠です。Claude Codeを使えば、Great ExpectationsやdbtのテストをSQLから自動生成できます。
プロンプト例:
注文データに対するデータ品質チェックをGreat Expectationsで作成して。
チェック項目:
1. order_id がユニーク
2. total_amount が0以上
3. status が定義値のみ
4. created_at が過去30日以内
5. null率が各カラム5%以下
6. 前日比で件数が50%以上変動していないこと
import great_expectations as gx
def create_order_expectations(context: gx.DataContext) -> None:
"""注文データの品質期待値を定義"""
suite = context.add_expectation_suite("orders_quality")
# 1. order_id ユニーク性
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
# 2. 金額の非負制約
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="total_amount",
min_value=0,
max_value=10_000_000,
)
)
# 3. ステータスの値域制約
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="status",
value_set=["pending", "completed", "cancelled",
"processing", "refunded"],
)
)
# 4. 日付の鮮度チェック
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="created_at",
min_value={"$PARAMETER": "now() - interval 30 days"},
max_value={"$PARAMETER": "now()"},
)
)
# 5. null率制約
for col in ["order_id", "customer_id", "total_amount",
"status", "created_at"]:
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(
column=col,
mostly=0.95,
)
)
context.save_expectation_suite(suite)
dbtを使ったデータ変換の管理
近年のデータパイプラインではdbt(data build tool)の採用が急速に進んでいます。Claude Codeでdbtモデルやテストを効率的に生成する方法を見ていきましょう。
プロンプト例:
注文ファクトテーブルのdbtモデルを作成して。
- ソース: raw.orders, raw.order_items, raw.customers
- 粒度: 注文行(order_item)レベル
- サロゲートキー生成
- インクリメンタルモデル
- schema.ymlのテストも含めて
-- models/marts/fct_order_items.sql
{{
config(
materialized='incremental',
unique_key='order_item_sk',
partition_by={
"field": "order_date",
"data_type": "date",
"granularity": "day"
}
)
}}
with orders as (
select * from {{ ref('stg_orders') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
),
order_items as (
select * from {{ ref('stg_order_items') }}
),
customers as (
select * from {{ ref('dim_customers') }}
)
select
{{ dbt_utils.generate_surrogate_key(
['oi.order_id', 'oi.item_id']
) }} as order_item_sk,
o.order_id,
oi.item_id,
o.customer_id,
c.customer_segment,
oi.product_id,
oi.quantity,
oi.unit_price,
oi.quantity * oi.unit_price as line_total,
o.order_date,
o.status,
o.updated_at
from order_items oi
inner join orders o on oi.order_id = o.order_id
left join customers c on o.customer_id = c.customer_id
エラーハンドリングとリトライ戦略
データパイプラインは本番環境で24時間365日稼働します。障害時のリカバリを適切に設計することが、運用コストを大きく左右します。
デッドレターキュー(DLQ)パターン
処理できなかったレコードを専用のキューに退避させ、後から再処理する設計パターンです。
from dataclasses import dataclass, field
from datetime import datetime
import json
@dataclass
class DeadLetterRecord:
"""処理失敗レコードの管理"""
original_data: dict
error_message: str
error_type: str
failed_at: datetime = field(default_factory=datetime.now)
retry_count: int = 0
max_retries: int = 3
@property
def can_retry(self) -> bool:
return self.retry_count < self.max_retries
class DeadLetterQueue:
"""デッドレターキューの実装"""
def __init__(self, storage_path: str) -> None:
self.storage_path = storage_path
self.records: list[DeadLetterRecord] = []
def push(self, record: DeadLetterRecord) -> None:
self.records.append(record)
self._persist(record)
def get_retryable(self) -> list[DeadLetterRecord]:
return [r for r in self.records if r.can_retry]
def retry_all(self, processor) -> dict[str, int]:
results = {"success": 0, "failed": 0, "skipped": 0}
for record in self.get_retryable():
try:
processor(record.original_data)
results["success"] += 1
self.records.remove(record)
except Exception:
record.retry_count += 1
results["failed"] += 1
return results
def _persist(self, record: DeadLetterRecord) -> None:
with open(self.storage_path, "a") as f:
f.write(json.dumps({
"data": record.original_data,
"error": record.error_message,
"failed_at": record.failed_at.isoformat(),
}) + "\n")
監視とアラートの設計
パイプラインの健全性を継続的に監視する仕組みも、Claude Codeで効率的に構築できます。
メトリクス収集の実装
from prometheus_client import Counter, Histogram, Gauge
import time
from functools import wraps
# メトリクス定義
RECORDS_PROCESSED = Counter(
"pipeline_records_processed_total",
"処理済みレコード数",
["stage", "status"],
)
PROCESSING_TIME = Histogram(
"pipeline_processing_seconds",
"処理時間(秒)",
["stage"],
buckets=[0.1, 0.5, 1, 5, 10, 30, 60, 300],
)
PIPELINE_LAG = Gauge(
"pipeline_lag_seconds",
"パイプラインの遅延(秒)",
["pipeline_name"],
)
def track_stage(stage_name: str):
"""パイプラインステージの計測デコレータ"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
RECORDS_PROCESSED.labels(
stage=stage_name, status="success"
).inc()
return result
except Exception as e:
RECORDS_PROCESSED.labels(
stage=stage_name, status="error"
).inc()
raise
finally:
PROCESSING_TIME.labels(
stage=stage_name
).observe(time.time() - start)
return wrapper
return decorator
SES案件でのデータパイプライン構築の実践ポイント
SES案件でデータパイプラインを構築する際の、現場で役立つポイントをまとめます。
案件参画時のチェックリスト
- 既存のデータフロー図があるか確認する
- データソースへのアクセス権限を早期に取得する
- データ量とピーク時間帯を把握する
- SLAの定義(遅延許容時間、データ鮮度要件)を確認する
- 障害発生時のエスカレーション先を明確にする
Claude Codeで技術選定の相談
案件で使用する技術スタックの選定にもClaude Codeを活用できます。
プロンプト例:
以下の要件に最適なデータパイプラインの技術構成を提案して。
- データ量: 1日100万レコード
- リアルタイム性: 5分以内の遅延許容
- 予算: AWSで月額$500以内
- チーム: 2名(Pythonメイン)
- 既存環境: AWS、PostgreSQL
このような相談をClaude Codeに投げることで、経験豊富なアーキテクトに相談するかのように技術選定が進められます。
よくあるトラブルと対処法
| トラブル | 原因 | Claude Codeでの解決法 |
|---|---|---|
| メモリ不足 | 大量データの一括読み込み | チャンク処理への書き換えを指示 |
| 処理時間超過 | 非効率なクエリ | EXPLAINの結果を渡してチューニング依頼 |
| データ不整合 | スキーマ変更 | スキーマ検証コードの自動生成 |
| 依存関係の循環 | DAG設計ミス | DAG構造の見直しを相談 |
まとめ:Claude Codeでデータパイプライン構築を加速しよう
データパイプラインの構築は、ETL設計・バッチ処理・リアルタイム連携・品質管理と多くの要素が絡み合う複雑な作業です。しかし、Claude Codeを適切に活用すれば、各フェーズのコード生成からアーキテクチャ相談まで大幅に効率化できます。
特にSESエンジニアにとって、データパイプラインスキルは案件選択の幅を広げ、単価向上にも直結します。Claude Codeを武器に、この成長領域にぜひチャレンジしてみてください。
Claude Codeの基本的な使い方から学びたい方はClaude Code入門ガイドを、Python開発での活用法はClaude Code Python開発ガイドをご覧ください。APIの設計・実装についてはClaude Code API開発ガイド、テスト自動化はClaude Codeテスト生成ガイドが参考になります。