𝕏 f B! L
案件・求人数 12,345
案件を探す(準備中) エージェントを探す(準備中) お役立ち情報 ログイン
案件・求人数 12,345
Claude Codeでデータパイプラインを構築する方法|ETL設計・バッチ処理・リアルタイム連携ガイド

Claude Codeでデータパイプラインを構築する方法|ETL設計・バッチ処理・リアルタイム連携ガイド

Claude CodeデータパイプラインETLバッチ処理AI開発ツール
目次

「データの取り込みから加工・集約まで、パイプライン構築に毎回膨大な時間がかかる」——データエンジニアリングに携わるSESエンジニアなら、一度は感じたことがある悩みでしょう。

結論から言えば、Claude Codeを活用することでETL設計からバッチ処理、リアルタイムストリーミングまで、データパイプライン構築の工数を大幅に短縮できます。本記事では、実際の開発フローに沿って具体的なプロンプト例とともに解説します。

この記事を3秒でまとめると

  • Claude CodeはETL設計・スキーマ定義・データ変換ロジックの生成に強い
  • CLAUDE.mdにデータソース情報を記述することでコンテキスト精度が上がる
  • バッチ処理からリアルタイムストリーミングまで一貫してAIで効率化できる

Claude Codeデータパイプライン構築の全体像

データパイプライン構築でClaude Codeが威力を発揮する理由

データパイプラインの構築は、単にコードを書くだけでは終わりません。データソースの特性理解、スキーマ設計、変換ロジック、エラーハンドリング、リトライ戦略——多くの要素が複雑に絡み合います。

Claude Codeがこの領域で力を発揮する理由は、プロジェクト全体を俯瞰してコードを生成できる点にあります。CSVファイルの構造、DBテーブルの定義、APIレスポンスのスキーマをすべて把握した上で、適切な変換処理を提案してくれるのです。

Claude Codeが得意なデータパイプラインタスク

タスク具体的な活用法効率化の度合い
ETL設計抽出・変換・ロードの各ステップのコード生成工数70%削減
スキーマ定義データソースからのスキーマ自動推論工数60%削減
バッチ処理cron/スケジューラ連携のジョブ設計工数50%削減
データ品質チェックバリデーションルールの自動生成工数80%削減
エラーハンドリングリトライ・デッドレターキュー設計工数55%削減

なぜSESエンジニアにデータパイプラインスキルが求められるのか

SES市場において、データエンジニアリング案件の需要は年々増加しています。従来の「アプリケーション開発」中心の案件から、データ基盤構築・データ分析基盤の整備を求める案件が急増しているのです。

具体的には以下のような案件が増えています。

  • データレイク/データウェアハウス構築: S3 + Glue + Redshift、BigQuery連携
  • リアルタイムデータ処理: Kafka + Flink、Kinesis + Lambda
  • MLパイプライン: 特徴量エンジニアリング + モデル学習パイプライン
  • データ品質管理: Great Expectations、dbt testの導入

Claude Codeを使いこなせれば、こうした案件への参画もスムーズになります。

CLAUDE.mdでデータパイプラインの文脈を伝える

Claude Codeの精度を最大限に引き出すには、CLAUDE.mdファイルにプロジェクトの文脈を適切に記述することが重要です。データパイプラインプロジェクトでは、以下の情報を含めると効果的です。

# CLAUDE.md - データパイプラインプロジェクト

## プロジェクト概要
ECサイトの注文データを集約するETLパイプライン

## データソース
- PostgreSQL (注文DB): orders, order_items, customers テーブル
- REST API (在庫管理): GET /api/v1/inventory
- S3バケット (CSVアップロード): s3://raw-data/daily-reports/

## 出力先
- BigQuery: analytics.orders_fact, analytics.customers_dim
- Redshift: warehouse.daily_summary

## 技術スタック
- Python 3.12 + Apache Airflow 2.8
- pandas / polars でデータ変換
- SQLAlchemy でDB接続
- boto3 でAWS連携

## コーディング規約
- 型ヒント必須 (mypy strict)
- docstring は Google style
- テストは pytest + fixtures

この情報があるだけで、Claude Codeの提案精度が格段に向上します。「orders テーブルから月次集計を出して」と指示するだけで、適切なJOINとGROUP BYを含むクエリを生成してくれるようになります。

ETLパイプラインの設計をClaude Codeで効率化する

Extract(抽出)フェーズ

データ抽出では、複数のソースからデータを取得するコードを書く必要があります。Claude Codeを使えば、各ソースに応じた接続・取得ロジックを素早く生成できます。

プロンプト例:

PostgreSQLのordersテーブルから前日分のデータを抽出するExtractorクラスを作成して。
要件:
- SQLAlchemy使用
- 日付範囲指定で抽出
- バッチサイズ10,000件ずつ取得
- 接続エラー時は3回リトライ
- 抽出件数をログ出力

Claude Codeはこのプロンプトから、以下のような堅牢な抽出コードを生成します。

import logging
from datetime import date, timedelta
from typing import Iterator

import pandas as pd
from sqlalchemy import create_engine, text
from tenacity import retry, stop_after_attempt, wait_exponential

logger = logging.getLogger(__name__)


class OrderExtractor:
    """注文データの抽出を担当するクラス"""

    def __init__(self, connection_url: str, batch_size: int = 10_000) -> None:
        self.engine = create_engine(connection_url)
        self.batch_size = batch_size

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=30),
    )
    def extract(
        self,
        start_date: date | None = None,
        end_date: date | None = None,
    ) -> Iterator[pd.DataFrame]:
        """指定日付範囲の注文データをバッチで抽出する"""
        if start_date is None:
            start_date = date.today() - timedelta(days=1)
        if end_date is None:
            end_date = date.today()

        query = text("""
            SELECT o.id, o.customer_id, o.total_amount,
                   o.status, o.created_at
            FROM orders o
            WHERE o.created_at >= :start_date
              AND o.created_at < :end_date
            ORDER BY o.created_at
        """)

        total_rows = 0
        with self.engine.connect() as conn:
            for chunk in pd.read_sql(
                query,
                conn,
                params={"start_date": start_date, "end_date": end_date},
                chunksize=self.batch_size,
            ):
                total_rows += len(chunk)
                logger.info(f"抽出済み: {total_rows}件")
                yield chunk

        logger.info(f"抽出完了: 合計{total_rows}件")

Transform(変換)フェーズ

データ変換は最も複雑になりやすいフェーズです。Claude Codeに変換ルールを伝えることで、正確な変換ロジックを生成できます。

プロンプト例:

注文データの変換処理を作成して。
変換ルール:
1. 金額をJPYからUSDに変換(為替レートはAPIから取得)
2. ステータスを英語から日本語にマッピング(pending→未処理, completed→完了, cancelled→キャンセル)
3. 顧客IDから顧客セグメント(新規/リピーター/VIP)を判定
4. 日付カラムからyear, month, day_of_weekを抽出
5. null値のデフォルト処理
polarsを使って高速に処理すること。

Claude Codeは型安全で高速なpolarsベースの変換コードを生成してくれます。

import polars as pl
from enum import Enum


class CustomerSegment(str, Enum):
    NEW = "新規"
    REPEAT = "リピーター"
    VIP = "VIP"


STATUS_MAP = {
    "pending": "未処理",
    "completed": "完了",
    "cancelled": "キャンセル",
    "processing": "処理中",
    "refunded": "返金済み",
}


def transform_orders(
    df: pl.DataFrame,
    exchange_rate: float,
    customer_history: pl.DataFrame,
) -> pl.DataFrame:
    """注文データの変換処理"""
    return (
        df.with_columns([
            # 通貨変換
            (pl.col("total_amount") / exchange_rate)
                .round(2)
                .alias("total_amount_usd"),
            # ステータスマッピング
            pl.col("status")
                .replace(STATUS_MAP)
                .alias("status_ja"),
            # 日付分解
            pl.col("created_at").dt.year().alias("year"),
            pl.col("created_at").dt.month().alias("month"),
            pl.col("created_at").dt.weekday().alias("day_of_week"),
        ])
        # 顧客セグメント判定
        .join(
            customer_history.select(["customer_id", "order_count"]),
            on="customer_id",
            how="left",
        )
        .with_columns(
            pl.when(pl.col("order_count").is_null() | (pl.col("order_count") == 0))
              .then(pl.lit(CustomerSegment.NEW.value))
              .when(pl.col("order_count") >= 10)
              .then(pl.lit(CustomerSegment.VIP.value))
              .otherwise(pl.lit(CustomerSegment.REPEAT.value))
              .alias("customer_segment")
        )
        # null値処理
        .with_columns([
            pl.col("total_amount").fill_null(0),
            pl.col("status_ja").fill_null("不明"),
        ])
    )

Load(ロード)フェーズ

変換済みデータの書き込みもClaude Codeで効率化できます。特にBigQueryやRedshiftへのバルクロードは、適切なパーティショニングやupsert処理が求められますが、Claude Codeなら一発で生成可能です。

プロンプト例:

BigQueryへのデータロード処理を作成して。
要件:
- 日付パーティショニング(created_at)
- WRITE_TRUNCATE(日次で洗い替え)
- スキーマ自動検出
- ロード結果のログ出力
- エラー時のロールバック対応
from google.cloud import bigquery
from google.api_core.exceptions import GoogleAPIError
import logging

logger = logging.getLogger(__name__)


class BigQueryLoader:
    """BigQueryへのデータロードを担当"""

    def __init__(self, project_id: str, dataset: str) -> None:
        self.client = bigquery.Client(project=project_id)
        self.dataset = dataset

    def load_orders(
        self,
        df: "pd.DataFrame",
        table_name: str = "orders_fact",
        partition_date: str | None = None,
    ) -> bigquery.LoadJob:
        """注文データをBigQueryにロード"""
        table_ref = f"{self.client.project}.{self.dataset}.{table_name}"

        job_config = bigquery.LoadJobConfig(
            write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
            time_partitioning=bigquery.TimePartitioning(
                type_=bigquery.TimePartitioningType.DAY,
                field="created_at",
            ),
            autodetect=True,
        )

        if partition_date:
            table_ref = f"{table_ref}${partition_date}"

        try:
            job = self.client.load_table_from_dataframe(
                df, table_ref, job_config=job_config
            )
            job.result()  # 完了まで待機
            logger.info(
                f"ロード完了: {table_ref} ({job.output_rows}行)"
            )
            return job
        except GoogleAPIError as e:
            logger.error(f"BigQueryロードエラー: {e}")
            raise

Apache Airflow DAGをClaude Codeで生成する

ETLの各コンポーネントを組み合わせて、Airflow DAGとしてオーケストレーションする構成をClaude Codeに任せることも可能です。

プロンプト例:

以下のETLフローをAirflow DAGとして実装して。
1. PostgreSQLから注文データ抽出(日次)
2. 為替レートAPI取得
3. データ変換(上記のtransform_orders使用)
4. BigQueryロード
5. データ品質チェック(件数、null率)
6. Slack通知(成功/失敗)

スケジュール: 毎日JST 6:00
リトライ: 最大3回、5分間隔
SLA: 30分以内

Claude Codeが生成するDAGは、本番運用を考慮したものになります。

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.trigger_rule import TriggerRule

default_args = {
    "owner": "data-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["[email protected]"],
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "sla": timedelta(minutes=30),
}

with DAG(
    dag_id="daily_orders_etl",
    default_args=default_args,
    description="日次注文データETLパイプライン",
    schedule_interval="0 21 * * *",  # UTC 21:00 = JST 6:00
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["etl", "orders", "daily"],
) as dag:

    extract = PythonOperator(
        task_id="extract_orders",
        python_callable=extract_orders_task,
    )

    fetch_rate = PythonOperator(
        task_id="fetch_exchange_rate",
        python_callable=fetch_exchange_rate_task,
    )

    transform = PythonOperator(
        task_id="transform_orders",
        python_callable=transform_orders_task,
    )

    load = PythonOperator(
        task_id="load_to_bigquery",
        python_callable=load_to_bigquery_task,
    )

    quality_check = PythonOperator(
        task_id="data_quality_check",
        python_callable=quality_check_task,
    )

    notify_success = SlackWebhookOperator(
        task_id="notify_success",
        slack_webhook_conn_id="slack_webhook",
        message="✅ 日次注文ETL完了",
        trigger_rule=TriggerRule.ALL_SUCCESS,
    )

    notify_failure = SlackWebhookOperator(
        task_id="notify_failure",
        slack_webhook_conn_id="slack_webhook",
        message="❌ 日次注文ETLが失敗しました",
        trigger_rule=TriggerRule.ONE_FAILED,
    )

    [extract, fetch_rate] >> transform >> load >> quality_check
    quality_check >> [notify_success, notify_failure]

リアルタイムストリーミングパイプラインの構築

バッチ処理だけでなく、リアルタイムのデータストリーミングもClaude Codeでサポートできます。Kafka ConsumerやKinesis Data Streamsとの連携コードを効率的に生成しましょう。

Kafkaストリーミング処理

プロンプト例:

Kafkaからリアルタイムで注文イベントを消費し、
集約結果をRedisに書き込むストリーミング処理を作成して。
- consumer group: order-aggregator
- トピック: order-events
- 5分ウィンドウで売上集計
- exactly-once semanticsを保証
- Graceful shutdown対応
import signal
import json
from datetime import datetime, timedelta
from collections import defaultdict

from confluent_kafka import Consumer, KafkaError
import redis

class OrderStreamAggregator:
    """リアルタイム注文集約処理"""

    def __init__(
        self,
        kafka_config: dict,
        redis_url: str,
        window_minutes: int = 5,
    ) -> None:
        self.consumer = Consumer({
            **kafka_config,
            "group.id": "order-aggregator",
            "auto.offset.reset": "latest",
            "enable.auto.commit": False,
        })
        self.redis = redis.from_url(redis_url)
        self.window_minutes = window_minutes
        self.running = True
        self.window_data: dict[str, float] = defaultdict(float)

        signal.signal(signal.SIGTERM, self._shutdown)
        signal.signal(signal.SIGINT, self._shutdown)

    def _shutdown(self, signum, frame) -> None:
        self.running = False

    def _get_window_key(self, timestamp: datetime) -> str:
        window_start = timestamp.replace(
            minute=(timestamp.minute // self.window_minutes)
            * self.window_minutes,
            second=0,
            microsecond=0,
        )
        return f"sales:window:{window_start.isoformat()}"

    def run(self) -> None:
        self.consumer.subscribe(["order-events"])

        while self.running:
            msg = self.consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() != KafkaError._PARTITION_EOF:
                    raise Exception(f"Kafka error: {msg.error()}")
                continue

            event = json.loads(msg.value().decode("utf-8"))
            timestamp = datetime.fromisoformat(event["timestamp"])
            window_key = self._get_window_key(timestamp)

            # Redis上でアトミックに集計
            pipe = self.redis.pipeline()
            pipe.incrbyfloat(window_key, event["amount"])
            pipe.expire(window_key, 3600)  # 1時間でTTL
            pipe.execute()

            self.consumer.commit(asynchronous=False)

        self.consumer.close()

データ品質チェックの自動化

パイプラインの信頼性を担保するには、データ品質チェックが不可欠です。Claude Codeを使えば、Great ExpectationsやdbtのテストをSQLから自動生成できます。

プロンプト例:

注文データに対するデータ品質チェックをGreat Expectationsで作成して。
チェック項目:
1. order_id がユニーク
2. total_amount が0以上
3. status が定義値のみ
4. created_at が過去30日以内
5. null率が各カラム5%以下
6. 前日比で件数が50%以上変動していないこと
import great_expectations as gx

def create_order_expectations(context: gx.DataContext) -> None:
    """注文データの品質期待値を定義"""
    suite = context.add_expectation_suite("orders_quality")

    # 1. order_id ユニーク性
    suite.add_expectation(
        gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
    )

    # 2. 金額の非負制約
    suite.add_expectation(
        gx.expectations.ExpectColumnValuesToBeBetween(
            column="total_amount",
            min_value=0,
            max_value=10_000_000,
        )
    )

    # 3. ステータスの値域制約
    suite.add_expectation(
        gx.expectations.ExpectColumnValuesToBeInSet(
            column="status",
            value_set=["pending", "completed", "cancelled",
                       "processing", "refunded"],
        )
    )

    # 4. 日付の鮮度チェック
    suite.add_expectation(
        gx.expectations.ExpectColumnValuesToBeBetween(
            column="created_at",
            min_value={"$PARAMETER": "now() - interval 30 days"},
            max_value={"$PARAMETER": "now()"},
        )
    )

    # 5. null率制約
    for col in ["order_id", "customer_id", "total_amount",
                "status", "created_at"]:
        suite.add_expectation(
            gx.expectations.ExpectColumnValuesToNotBeNull(
                column=col,
                mostly=0.95,
            )
        )

    context.save_expectation_suite(suite)

dbtを使ったデータ変換の管理

近年のデータパイプラインではdbt(data build tool)の採用が急速に進んでいます。Claude Codeでdbtモデルやテストを効率的に生成する方法を見ていきましょう。

プロンプト例:

注文ファクトテーブルのdbtモデルを作成して。
- ソース: raw.orders, raw.order_items, raw.customers
- 粒度: 注文行(order_item)レベル
- サロゲートキー生成
- インクリメンタルモデル
- schema.ymlのテストも含めて
-- models/marts/fct_order_items.sql
{{
  config(
    materialized='incremental',
    unique_key='order_item_sk',
    partition_by={
      "field": "order_date",
      "data_type": "date",
      "granularity": "day"
    }
  )
}}

with orders as (
    select * from {{ ref('stg_orders') }}
    {% if is_incremental() %}
    where updated_at > (select max(updated_at) from {{ this }})
    {% endif %}
),

order_items as (
    select * from {{ ref('stg_order_items') }}
),

customers as (
    select * from {{ ref('dim_customers') }}
)

select
    {{ dbt_utils.generate_surrogate_key(
        ['oi.order_id', 'oi.item_id']
    ) }} as order_item_sk,
    o.order_id,
    oi.item_id,
    o.customer_id,
    c.customer_segment,
    oi.product_id,
    oi.quantity,
    oi.unit_price,
    oi.quantity * oi.unit_price as line_total,
    o.order_date,
    o.status,
    o.updated_at
from order_items oi
inner join orders o on oi.order_id = o.order_id
left join customers c on o.customer_id = c.customer_id

エラーハンドリングとリトライ戦略

データパイプラインは本番環境で24時間365日稼働します。障害時のリカバリを適切に設計することが、運用コストを大きく左右します。

デッドレターキュー(DLQ)パターン

処理できなかったレコードを専用のキューに退避させ、後から再処理する設計パターンです。

from dataclasses import dataclass, field
from datetime import datetime
import json


@dataclass
class DeadLetterRecord:
    """処理失敗レコードの管理"""
    original_data: dict
    error_message: str
    error_type: str
    failed_at: datetime = field(default_factory=datetime.now)
    retry_count: int = 0
    max_retries: int = 3

    @property
    def can_retry(self) -> bool:
        return self.retry_count < self.max_retries


class DeadLetterQueue:
    """デッドレターキューの実装"""

    def __init__(self, storage_path: str) -> None:
        self.storage_path = storage_path
        self.records: list[DeadLetterRecord] = []

    def push(self, record: DeadLetterRecord) -> None:
        self.records.append(record)
        self._persist(record)

    def get_retryable(self) -> list[DeadLetterRecord]:
        return [r for r in self.records if r.can_retry]

    def retry_all(self, processor) -> dict[str, int]:
        results = {"success": 0, "failed": 0, "skipped": 0}
        for record in self.get_retryable():
            try:
                processor(record.original_data)
                results["success"] += 1
                self.records.remove(record)
            except Exception:
                record.retry_count += 1
                results["failed"] += 1
        return results

    def _persist(self, record: DeadLetterRecord) -> None:
        with open(self.storage_path, "a") as f:
            f.write(json.dumps({
                "data": record.original_data,
                "error": record.error_message,
                "failed_at": record.failed_at.isoformat(),
            }) + "\n")

監視とアラートの設計

パイプラインの健全性を継続的に監視する仕組みも、Claude Codeで効率的に構築できます。

メトリクス収集の実装

from prometheus_client import Counter, Histogram, Gauge
import time
from functools import wraps

# メトリクス定義
RECORDS_PROCESSED = Counter(
    "pipeline_records_processed_total",
    "処理済みレコード数",
    ["stage", "status"],
)
PROCESSING_TIME = Histogram(
    "pipeline_processing_seconds",
    "処理時間(秒)",
    ["stage"],
    buckets=[0.1, 0.5, 1, 5, 10, 30, 60, 300],
)
PIPELINE_LAG = Gauge(
    "pipeline_lag_seconds",
    "パイプラインの遅延(秒)",
    ["pipeline_name"],
)


def track_stage(stage_name: str):
    """パイプラインステージの計測デコレータ"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start = time.time()
            try:
                result = func(*args, **kwargs)
                RECORDS_PROCESSED.labels(
                    stage=stage_name, status="success"
                ).inc()
                return result
            except Exception as e:
                RECORDS_PROCESSED.labels(
                    stage=stage_name, status="error"
                ).inc()
                raise
            finally:
                PROCESSING_TIME.labels(
                    stage=stage_name
                ).observe(time.time() - start)
        return wrapper
    return decorator

SES案件でのデータパイプライン構築の実践ポイント

SES案件でデータパイプラインを構築する際の、現場で役立つポイントをまとめます。

案件参画時のチェックリスト

  1. 既存のデータフロー図があるか確認する
  2. データソースへのアクセス権限を早期に取得する
  3. データ量とピーク時間帯を把握する
  4. SLAの定義(遅延許容時間、データ鮮度要件)を確認する
  5. 障害発生時のエスカレーション先を明確にする

Claude Codeで技術選定の相談

案件で使用する技術スタックの選定にもClaude Codeを活用できます。

プロンプト例:

以下の要件に最適なデータパイプラインの技術構成を提案して。
- データ量: 1日100万レコード
- リアルタイム性: 5分以内の遅延許容
- 予算: AWSで月額$500以内
- チーム: 2名(Pythonメイン)
- 既存環境: AWS、PostgreSQL

このような相談をClaude Codeに投げることで、経験豊富なアーキテクトに相談するかのように技術選定が進められます。

よくあるトラブルと対処法

トラブル原因Claude Codeでの解決法
メモリ不足大量データの一括読み込みチャンク処理への書き換えを指示
処理時間超過非効率なクエリEXPLAINの結果を渡してチューニング依頼
データ不整合スキーマ変更スキーマ検証コードの自動生成
依存関係の循環DAG設計ミスDAG構造の見直しを相談

まとめ:Claude Codeでデータパイプライン構築を加速しよう

データパイプラインの構築は、ETL設計・バッチ処理・リアルタイム連携・品質管理と多くの要素が絡み合う複雑な作業です。しかし、Claude Codeを適切に活用すれば、各フェーズのコード生成からアーキテクチャ相談まで大幅に効率化できます。

特にSESエンジニアにとって、データパイプラインスキルは案件選択の幅を広げ、単価向上にも直結します。Claude Codeを武器に、この成長領域にぜひチャレンジしてみてください。

Claude Codeの基本的な使い方から学びたい方はClaude Code入門ガイドを、Python開発での活用法はClaude Code Python開発ガイドをご覧ください。APIの設計・実装についてはClaude Code API開発ガイド、テスト自動化はClaude Codeテスト生成ガイドが参考になります。

SES案件をお探しですか?

SES記事をもっと読む →
🏗️

SES BASE 編集長

SES業界歴10年以上のメンバーが在籍する編集チーム。SES企業での営業・エンジニア経験、フリーランス独立経験を持つメンバーが、業界のリアルな情報をお届けします。

📊 業界データに基づく記事制作 🔍 IPA・経済産業省データ参照 💼 SES実務経験者が執筆・監修