Apache SparkとPython:データパイプラインの構築をステップバイステップで解説

PySpark 4.0を使用したデータパイプライン構築のハンズオンチュートリアルです。DataFrame操作、ETLパイプラインの構築、Spark 4.0の新機能を網羅し、データエンジニア向けの実践的なコード例を提供します。

Apache Spark PySpark data pipeline tutorial

2026年現在、Apache Sparkは大規模データパイプラインにおける分散処理エンジンとして確固たる地位を維持しています。PySparkを組み合わせることで、Python固有のAPIを活用し、パフォーマンスを犠牲にすることなく、あらゆる規模のクラスタ上でバッチ処理とストリーミング処理の両方を実行できます。

本記事では、PySpark 4.0のPython Data Source APIやネイティブDataFrameプロット機能といった最新機能を活用しながら、生データの取り込みからクリーンな出力まで、完全なETLデータパイプラインを構築する手順を解説します。

クイックリファレンス

PySparkのDataFrameは不変(イミュータブル)です。すべての変換処理は新しいDataFrameを返し、元のDataFrameは変更されません。この設計により、Sparkの遅延評価が実現されています。変換処理は.show().write().collect()などのアクションがトリガーされるまで実行されません。

PySpark 4.0環境のセットアップ

パイプラインを構築する前に、Sparkセッションを適切に設定する必要があります。Spark 4.0ではANSIモードがデフォルトで有効になっており、より厳密なSQLセマンティクスが適用されます。数値オーバーフローが発生した場合、以前のように暗黙的にラップアラウンドするのではなく、例外がスローされるようになりました。

python
# spark_setup.py
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("ETLPipeline")
    .config("spark.sql.adaptive.enabled", "true")       # AQE for runtime optimization
    .config("spark.sql.shuffle.partitions", "200")       # Tune based on cluster size
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)

# Verify Spark version
print(spark.version)  # 4.0.0

ここで有効化しているAdaptive Query Execution(AQE)は、実行時にシャッフルパーティションと結合戦略を動的に最適化する機能です。Spark 4.0では、AQEの改善とCatalystオプティマイザの強化により、一般的なETLワークロードにおいてSpark 3.x比で20〜50%の高速化が実現されています。

DataFrame APIによる生データの読み込み

PySparkのDataFrameは、分散処理を馴染みのある表形式APIの背後に抽象化します。CSV、Parquet、JSON、データベースなど、さまざまなソースからの読み込みは一貫したパターンに従います。

python
# read_sources.py
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Define schema explicitly — avoids slow schema inference on large files
order_schema = StructType([
    StructField("order_id", StringType(), nullable=False),
    StructField("customer_id", StringType(), nullable=False),
    StructField("product_code", StringType(), nullable=True),
    StructField("amount", DoubleType(), nullable=True),
    StructField("order_date", TimestampType(), nullable=True),
    StructField("region", StringType(), nullable=True),
])

# Read CSV with explicit schema
raw_orders = (
    spark.read
    .schema(order_schema)                    # Skip inference, enforce types
    .option("header", "true")                # First row contains column names
    .option("mode", "DROPMALFORMED")         # Skip rows that don't match schema
    .csv("s3a://data-lake/raw/orders/")
)

# Read Parquet — schema is embedded in the file format
products = spark.read.parquet("s3a://data-lake/raw/products/")

raw_orders.printSchema()
raw_orders.show(5, truncate=False)

明示的なスキーマ定義は、本番環境におけるベストプラクティスです。スキーマ推論ではデータソース全体のフルスキャンが必要となり、大規模データセットではコストが高くなります。さらに、郵便番号を整数として解釈してしまうなど、型の誤推測が発生する可能性もあります。

DataFrameのクリーニングと変換

生データには必ずクリーニングが必要です。PySparkの変換処理はDataFrame APIを使って自然にチェーンでき、DataFrameは不変であるため、各ステップで元のDataFrameを変更することなく新しいDataFrameが生成されます。

python
# clean_orders.py
from pyspark.sql import functions as F

cleaned_orders = (
    raw_orders
    .filter(F.col("order_id").isNotNull())                     # Remove null primary keys
    .filter(F.col("amount") > 0)                               # Filter invalid amounts
    .withColumn("customer_id", F.trim(F.upper(F.col("customer_id"))))  # Normalize IDs
    .withColumn("order_date", F.to_date(F.col("order_date")))  # Cast to date type
    .withColumn("year_month",                                   # Extract partition key
        F.date_format(F.col("order_date"), "yyyy-MM"))
    .dropDuplicates(["order_id"])                               # Deduplicate by order ID
)

print(f"Raw: {raw_orders.count()} rows -> Cleaned: {cleaned_orders.count()} rows")

変換チェーンにおける処理順序は重要です。重複排除の前にcustomer_idを正規化することで、一貫したマッチングが保証されます。また、日付のパース後にyear_monthを抽出することで、パーティションカラムにnull値が混入することを防止できます。

大規模DataFrameでのcollect()使用に注意

.collect()を呼び出すと、分散されたDataFrame全体がドライバのメモリに取り込まれます。ドライバのRAMを超えるデータセットに対して実行すると、OutOfMemoryErrorが発生します。代わりに、.show().take()、またはフィルタ済みのサブセットに対する.toPandas()を使用してください。

複数ソースの結合と集約

実際のパイプラインでは、複数のソースからデータを結合します。PySparkはすべての標準的な結合タイプをサポートしており、Spark 4.0のAQEは小さなテーブルに対してブロードキャスト結合を自動的に選択するため、手動でのヒント指定が不要になりました。

python
# enrich_orders.py
# Join orders with product catalog
enriched = (
    cleaned_orders
    .join(products, on="product_code", how="left")      # Keep all orders, even unmatched
    .select(
        "order_id",
        "customer_id",
        "product_code",
        F.col("name").alias("product_name"),              # Rename for clarity
        "amount",
        F.col("category"),                                # From products table
        "order_date",
        "year_month",
    )
)

# Aggregate: monthly revenue per product category
monthly_revenue = (
    enriched
    .groupBy("year_month", "category")
    .agg(
        F.sum("amount").alias("total_revenue"),            # Sum all order amounts
        F.countDistinct("order_id").alias("order_count"),  # Unique orders
        F.avg("amount").alias("avg_order_value"),          # Average basket size
    )
    .orderBy(F.desc("total_revenue"))
)

monthly_revenue.show(10)

left結合を使用すると、商品カタログに一致するものがない場合でもすべての注文が保持されます。これはデータ移行中や、商品カタログがトランザクションシステムに追いついていない場合によく発生するシナリオです。

Data Engineeringの面接対策はできていますか?

インタラクティブなシミュレーター、flashcards、技術テストで練習しましょう。

パーティショニングによる最適化された出力の書き込み

ETLパイプラインの最終ステップは、変換済みデータをターゲットストレージレイヤーに書き込むことです。頻繁にクエリされるカラムで出力をパーティショニングすることで、下流の利用者の読み取り時間を大幅に短縮できます。

python
# write_output.py
# Write enriched data partitioned by year_month
(
    enriched
    .repartition("year_month")                             # Align partitions with output
    .write
    .mode("overwrite")                                     # Replace existing partition data
    .partitionBy("year_month")                             # Physical directory partitioning
    .parquet("s3a://data-lake/curated/enriched_orders/")
)

# Write aggregated metrics as a single compact file
(
    monthly_revenue
    .coalesce(1)                                           # Single output file for small results
    .write
    .mode("overwrite")
    .parquet("s3a://data-lake/curated/monthly_revenue/")
)

print("Pipeline complete — output written to curated layer")

.partitionBy()の前に.repartition()を使用することで、各物理パーティションに適切な数のファイルが含まれるようになります。これを行わないと、Sparkはパーティションごとに数百の小さなファイルを生成する可能性があります。これは「スモールファイル問題」として知られるアンチパターンであり、HDFSやオブジェクトストアでの読み取りパフォーマンスを大幅に低下させます。

Spark 4.0:カスタムコネクタ用Python Data Source API

Spark 4.0では、Python Data Source APIが導入され、カスタムコネクタをJavaやScalaで記述する必要がなくなりました。これにより、独自システム、REST API、ニッチなファイル形式との統合が簡素化されます。

python
# custom_source.py
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

class APIDataSource(DataSource):
    """Custom data source reading from an internal REST API."""

    @classmethod
    def name(cls) -> str:
        return "internal_api"                              # Registered source name

    def schema(self) -> StructType:
        return StructType([                                # Define output schema
            StructField("id", IntegerType()),
            StructField("name", StringType()),
            StructField("status", StringType()),
        ])

    def reader(self, schema: StructType) -> "APIDataSourceReader":
        return APIDataSourceReader(self.options)           # Pass options to reader

class APIDataSourceReader(DataSourceReader):
    def __init__(self, options):
        self.endpoint = options.get("endpoint", "")        # API endpoint URL

    def read(self, partition):
        import requests                                    # Import inside read for serialization
        response = requests.get(self.endpoint)
        for record in response.json():
            yield (record["id"], record["name"], record["status"])

# Register and use the custom source
spark.dataSource.register(APIDataSource)
api_df = spark.read.format("internal_api").option("endpoint", "https://api.internal/users").load()
api_df.show()

Python Data Source APIはバッチ読み取りとストリーミング読み取りの両方をサポートしています。本番環境での使用においては、read()内にエラーハンドリングを追加し、パーティションレベルの並列処理を実装することで、スループットを大幅に改善できます。

Spark 4.0のSpark Connect

Spark Connectは、クライアントアプリケーションをSparkクラスタから分離します。軽量なPySparkクライアント(pyspark-client)はわずか1.5 MBで、完全なPySparkパッケージの355 MBと比較して大幅に軽量です。これにより、ローカルにSparkをインストールすることなく、あらゆるIDEやノートブックからSparkジョブを実行できるようになります。

PySparkパイプラインのパフォーマンスチューニングチェックリスト

動作するパイプラインを構築することは課題の一部に過ぎません。大規模環境でのパフォーマンスを確保するには、パーティショニング、キャッシュ、結合戦略に注意を払う必要があります。

| テクニック | 適用タイミング | 効果 | |-----------|---------------|--------| | 明示的スキーマ | すべての読み取り操作 | スキーマ推論スキャンを排除 | | パーティションプルーニング | パーティション化されたデータセットのフィルタリング時 | 無関係なデータパーティションを完全にスキップ | | ブロードキャスト結合 | 小さなテーブル(10 MB未満)と大きなテーブルの結合 | コストの高いシャッフルを回避 | | キャッシュ | 複数のアクションでDataFrameを再利用する場合 | 再計算を防止 | | CoalesceとRepartition | パーティション数を削減する場合 | coalesceはフルシャッフルを回避 | | AQE | 常時(Spark 4.0ではデフォルト) | 結合とシャッフルの実行時最適化 |

Spark UIを通じたパイプラインパフォーマンスの監視は依然として不可欠です。DAGの可視化によりシャッフル境界が明らかになり、SQLタブではCatalystオプティマイザが選択した物理プランを確認できます。

まとめ

  • PySpark 4.0は、ANSIモードのSQLコンプライアンスをデフォルトで有効にした、あらゆる規模でETLパイプラインを構築するための成熟したPythonファーストのAPIを提供します
  • 明示的なスキーマ定義、適切なパーティショニング、AQEの活用により、本番環境のパイプラインにおける最も一般的なパフォーマンスボトルネックを解消できます
  • 新しいPython Data Source APIにより、カスタムコネクタのJava/Scalaの壁が取り除かれました。REST API、独自フォーマット、ストリーミングソースのすべてを純粋なPythonで統合できます
  • 変換の順序は重要です。重複排除の前に識別子を正規化し、パーティションキーの抽出前に日付をパースする必要があります
  • データエンジニアリング面接対策では、遅延評価、シャッフルメカニクス、パーティション戦略といったSparkの内部構造が頻繁に取り上げられます。これらのパイプラインの基礎を理解することは、面接準備に直結します
  • パイプラインアーキテクチャの設計判断については、ETL・ELTパターンで詳しく解説しています

今すぐ練習を始めましょう!

面接シミュレーターと技術テストで知識をテストしましょう。

タグ

#Apache Spark
#PySpark
#Data Engineering
#ETL
#Python

共有

関連記事