Apache Spark 4 完全ガイド 2026年版:新機能、Structured Streaming、面接対策

Apache Spark 4の主要な新機能を詳しく解説します。ANSI SQLモード、VARIANT型、リアルタイムストリーミング、Spark Connectなど、データエンジニアリング面接で頻出のトピックを網羅的にカバーしています。

Apache Spark 4のデータエンジニアリングStructured Streamingパイプラインの解説図

Apache Spark 4は、バージョン3.0以来最大規模のアーキテクチャ刷新を遂げたリリースです。2025年半ばの正式リリース後、4.1および4.2と急速に進化を続けており、データエンジニアがパイプラインを構築し、ストリーミングワークロードを実行し、大規模SQLを処理する方法を根本的に変えています。本記事では、主要な変更点をPySparkとScalaのコードで実演し、採用チームが最も頻繁に問うデータエンジニアリング面接の質問を網羅します。

Spark 4 リリース経緯

Spark 4.0.0は2025年5月に390名以上のコントリビューターによる5,100件超のチケット解決とともに出荷されました。Spark 4.1.0は2025年12月にリリースされ、リアルタイムモードストリーミングと宣言的パイプラインが追加されました。Spark 4.2のプレビュービルドは2026年2月に開始されています。

Spark 4でANSI SQLモードがデフォルト有効化

Spark 4.0ではspark.sql.ansi.enabledがデフォルトでtrueに設定されました。従来は暗黙的にNULLを返すか、値を切り詰めていた算術オーバーフロー、無効なキャスト、ゼロ除算のすべてが例外をスローするようになります。その目的は、PostgreSQLやOracleの動作に一致する厳格なSQL標準準拠です。

実務への影響はETLパイプラインで最も顕著に現れます。暗黙的な型変換やサイレントなNULL伝播に依存していたクエリは、明示的に処理しない限り実行時に失敗します。

python
# ansi_mode_migration.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, try_cast

spark = SparkSession.builder.appName("ANSIMigration").getOrCreate()

# This now throws ArithmeticException in Spark 4 ANSI mode
# SELECT CAST('abc' AS INT)  -- runtime error

# Safe migration: use TRY_CAST to preserve NULL behavior
df = spark.sql("""
    SELECT
        TRY_CAST(raw_value AS INT) AS parsed_value,
        TRY_CAST(amount AS DECIMAL(10,2)) AS safe_amount
    FROM raw_events
    WHERE TRY_CAST(raw_value AS INT) IS NOT NULL
""")

# Alternatively, wrap arithmetic in try_divide / try_add
result = df.withColumn(
    "ratio",
    col("safe_amount") / col("parsed_value")  # throws on zero
)

移行チェックリストとしては、すべてのCAST呼び出しを監査し、NULL安全な動作が期待される箇所ではTRY_CASTに置き換え、数値カラムに対する明示的なオーバーフローチェックを追加することが推奨されます。

VARIANT型による半構造化データ処理

Spark 4.0では、JSONライクな半構造化データ向けにネイティブのVARIANTカラム型が追加されました。生のJSON文字列を格納してクエリ時にパースする従来の方法とは異なり、VARIANTはカラムシュレッディングを備えた最適化バイナリ形式を使用し、ネストされたフィールドアクセスで最大8倍高速な読み取りを実現します。

VariantExample.scalascala
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("VariantDemo").getOrCreate()
import spark.implicits._

// Create a table with VARIANT column
spark.sql("""
    CREATE TABLE events (
        event_id BIGINT,
        timestamp TIMESTAMP,
        payload VARIANT
    ) USING PARQUET
""")

// Insert JSON data — automatically converted to VARIANT
spark.sql("""
    INSERT INTO events VALUES
    (1, current_timestamp(),
     PARSE_JSON('{"user_id": 42, "action": "click", "metadata": {"page": "/home", "duration_ms": 320}}')),
    (2, current_timestamp(),
     PARSE_JSON('{"user_id": 87, "action": "purchase", "metadata": {"item_id": "SKU-9001", "amount": 49.99}}'))
""")

// Query nested fields with dot notation — uses shredding for fast access
val clicks = spark.sql("""
    SELECT
        event_id,
        payload:user_id::INT AS user_id,
        payload:action::STRING AS action,
        payload:metadata.page::STRING AS page
    FROM events
    WHERE payload:action::STRING = 'click'
""")
clicks.show()

VARIANTは、スキーマオンリードの回避策を不要にします。イベントスキーマが頻繁に変化するデータレイクアーキテクチャでは、単一のVARIANTカラムがDDLマイグレーションなしにスキーマ変更を吸収します。Spark 4.1ではParquetレベルでのシュレッディングが拡張され、選択的フィールドアクセスのIOがさらに削減されました。

Spark 4.1のリアルタイムモードストリーミング

Spark 4.1のStructured Streamingにはリアルタイムモード(RTM)が導入されました。これはSparkにおけるサブ秒レイテンシの初の公式サポートです。ステートレスワークロードではレイテンシが1桁ミリ秒まで低下し、Apache Flinkの得意分野で直接競合します。

これを可能にする3つのアーキテクチャ変更があります。連続データフロー(マイクロバッチ境界なし)、パイプラインスケジューリング(読み取り/計算/書き込みステージのオーバーラップ)、ストリーミングシャッフル(バッチ間のチェックポイントではなくメモリ内に状態を格納)です。

python
# real_time_streaming.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

spark = SparkSession.builder \
    .appName("RTMDemo") \
    .config("spark.sql.streaming.mode", "realtime") \
    .getOrCreate()

schema = StructType([
    StructField("sensor_id", StringType()),
    StructField("temperature", DoubleType()),
    StructField("event_time", TimestampType())
])

# Read from Kafka with Real-Time Mode enabled
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "sensor-readings") \
    .option("startingOffsets", "latest") \
    .load()

parsed = raw_stream \
    .select(from_json(col("value").cast("string"), schema).alias("data")) \
    .select("data.*")

# Write to Kafka sink — sub-second end-to-end
query = parsed \
    .filter(col("temperature") > 80.0) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("topic", "alerts") \
    .option("checkpointLocation", "/tmp/checkpoints/alerts") \
    .start()

query.awaitTermination()

RTMの有効化には単一の設定変更のみが必要です:spark.sql.streaming.mode = realtime。コードの書き換えは不要です。Spark 4.1では、KafkaソースおよびKafka/Foreachシンクを使用するステートレスの単一ステージScalaおよびPySparkクエリから対応を開始しています。ステートフルRTMサポートはSpark 4.2で予定されています。

Spark Connectと軽量PySparkクライアント

Spark 4.0では、わずか1.5MBのスタンドアロンpyspark-clientパッケージが出荷されています。これはフルPySparkディストリビューションの300MB超と比較して極めて軽量です。この軽量クライアントはSpark Connectを通じてgRPC経由でリモートSparkクラスタと通信し、ノートブック環境やCIパイプラインがフルランタイムをバンドルせずにSparkジョブを実行できるようにします。

python
# spark_connect_client.py
from pyspark.sql import SparkSession

# Connect to remote Spark cluster — no local JVM needed
spark = SparkSession.builder \
    .remote("sc://spark-cluster.internal:15002") \
    .getOrCreate()

# Full DataFrame API available through thin client
df = spark.read.parquet("s3a://data-lake/events/2026/")

aggregated = df.groupBy("country", "event_type") \
    .count() \
    .orderBy("count", ascending=False)

aggregated.show(20)

Spark ConnectはJavaおよびSwiftクライアントもサポートしています。spark.api.mode設定により、従来の組み込みモードとConnectモードをコード変更なしに切り替えられます。データエンジニアリングチームにとって、これはクライアント環境をクラスタのアップグレードから分離することを意味します。サーバーを更新しても、クライアントは引き続き動作します。

Data Engineeringの面接対策はできていますか?

インタラクティブなシミュレーター、flashcards、技術テストで練習しましょう。

transformWithState API:複雑なストリーミングロジック向け

Spark 4.0では、制限のあったmapGroupsWithStateに代わりtransformWithStateが導入されました。複合状態型、TTLベースの有効期限、イベント駆動タイマーをサポートする柔軟なAPIです。

SessionTracker.scalascala
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StatefulProcessor
import java.time.Duration

case class UserEvent(userId: String, action: String, timestamp: Long)
case class SessionSummary(userId: String, actionCount: Int, durationMs: Long)

class SessionProcessor extends StatefulProcessor[String, UserEvent, SessionSummary] {

  // ValueState with 30-minute TTL — auto-cleanup of idle sessions
  @transient private var sessionStart: ValueState[Long] = _
  @transient private var actionCount: ValueState[Int] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    sessionStart = getHandle.getValueState[Long]("start", TTLConfig(Duration.ofMinutes(30)))
    actionCount = getHandle.getValueState[Int]("count", TTLConfig(Duration.ofMinutes(30)))
  }

  override def handleInputRows(
      key: String,
      rows: Iterator[UserEvent],
      timerValues: TimerValues
  ): Iterator[SessionSummary] = {
    rows.flatMap { event =>
      if (!sessionStart.exists()) {
        sessionStart.update(event.timestamp)
        actionCount.update(1)
        Iterator.empty
      } else {
        val count = actionCount.get() + 1
        actionCount.update(count)
        // Emit summary every 10 actions
        if (count % 10 == 0) {
          val duration = event.timestamp - sessionStart.get()
          Iterator(SessionSummary(key, count, duration))
        } else Iterator.empty
      }
    }
  }
}

3つの状態型がほとんどのストリーミングユースケースをカバーします。キーごとの単一値用のValueState、イベントログのような追記型パターン用のListState、グルーピングキー内のキーバリュー検索用のMapStateです。TTLベースの有効期限は手動の状態クリーンアップを不要にし、古い状態がメモリ圧迫を引き起こす長時間実行パイプラインにとって重要な機能です。

Spark 4.1のSpark Declarative Pipelines(SDP)

Spark 4.1にはSpark Declarative Pipelinesが追加されました。エンジニアがデータセットと変換を定義し、Sparkが実行順序、並列化、チェックポイント、リトライを処理するフレームワークです。SDPはSQL中心の変換ワークフローでdbtと直接競合しますが、Spark内部でネイティブに実行されます。

python
# declarative_pipeline.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, current_date

spark = SparkSession.builder.appName("SDP-Demo").getOrCreate()

# Define datasets declaratively — Spark resolves dependencies
@spark.declarative.table("clean_orders")
def clean_orders():
    return spark.read.table("raw_orders") \
        .filter(col("status") != "cancelled") \
        .filter(col("amount") > 0)

@spark.declarative.table("daily_revenue")
def daily_revenue():
    return spark.read.table("clean_orders") \
        .groupBy("order_date", "region") \
        .agg(_sum("amount").alias("total_revenue"))

@spark.declarative.table("revenue_summary")
def revenue_summary():
    return spark.read.table("daily_revenue") \
        .groupBy("region") \
        .agg(_sum("total_revenue").alias("grand_total")) \
        .orderBy("grand_total", ascending=False)

# Execute — Spark builds the DAG, parallelizes, handles retries
spark.declarative.run()

SDPは依存関係グラフを自動的に管理します。clean_ordersが失敗した場合、下流のテーブルのみがブロックされます。チェックポイントは中間結果を永続化するため、障害からの回復時にはすでに完了したステージをスキップできます。オーケストレーションにAirflowを使用しているチームにとって、SDPがパイプライン内の依存関係を処理し、Airflowがパイプライン間のスケジューリングを管理するという役割分担が可能です。

Spark 4に関するデータエンジニアリング面接質問

採用チームはSpark 4の知識をますますテストするようになっています。以下は最も頻出する質問と簡潔な回答です。

面接のポイント

Sparkの質問に回答する際は、その動作がどのバージョンに適用されるかを必ず明記してください。Spark 4はいくつかのデフォルト値を変更しており、特にANSIモードとストリーミングセマンティクスについてはSpark 3.xの回答と矛盾する場合があります。

Q1: Spark 4でANSIモードがデフォルト有効になると何が変わりますか?

算術オーバーフロー、無効なキャスト、ゼロ除算は、NULLを返したりサイレントにラップしたりする代わりにランタイム例外をスローします。移行にはCASTTRY_CASTに置き換え、数値操作に明示的なエラー処理を追加する必要があります。

Q2: VARIANT型とその使用場面について説明してください。

VARIANTはカラムシュレッディングを備えた最適化バイナリ形式で半構造化データを格納します。生のJSON文字列とは異なり、VARIANTはストレージレイヤーで述語プッシュダウンと選択的フィールド読み取りを可能にします。スキーマが変化するイベントデータ、APIレスポンスログ、行ごとに構造が異なるカラムに最適です。

Q3: リアルタイムモードとマイクロバッチストリーミングの違いは何ですか?

マイクロバッチは離散的な間隔(通常100ms以上)でデータを処理します。RTMはパイプラインスケジューリングによる連続データフローを使用し、ステートレスワークロードで1桁ミリ秒のレイテンシを達成します。RTMはバッチ境界を完全に排除し、ソースからオペレーターを経由してシンクまでステップ間のチェックポイントなしにデータが流れます。

Q4: transformWithStateとは何か、なぜmapGroupsWithStateを置き換えるのですか?

transformWithStateは複合状態型(ValueState、ListState、MapState)、ネイティブTTLベースの有効期限、イベント駆動タイマーをサポートします。mapGroupsWithStateは手動タイムアウト処理を伴う単一の不透明な状態オブジェクトのみをサポートしていました。新しいAPIはセッションウィンドウ、重複排除、ステートフル集計のボイラープレートを削減します。

Q5: Spark Connectが解決する問題は何ですか?

Spark ConnectはgRPC経由でクライアントをSparkランタイムから分離します。pyspark-clientパッケージはフルPySparkの300MB超に対してわずか1.5MBです。クライアントはローカルJVMなしでリモートクラスタに接続でき、軽量ノートブック環境と独立したクライアント/サーバーバージョンのアップグレードを可能にします。

Spark 4の完全なETLパイプライン構築

すべてをまとめると、Kafkaから読み取り、VARIANTペイロードをパースし、ETLパターンを適用してDelta Lakeテーブルに書き込むパイプラインが構築できます。

python
# spark4_etl_pipeline.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, current_timestamp, parse_json

spark = SparkSession.builder \
    .appName("Spark4ETL") \
    .config("spark.sql.ansi.enabled", "true") \
    .getOrCreate()

# Bronze: raw ingestion from Kafka
bronze = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "user-events") \
    .load() \
    .select(
        col("key").cast("string").alias("event_key"),
        parse_json(col("value").cast("string")).alias("payload"),
        col("timestamp").alias("kafka_ts")
    )

# Silver: extract typed fields from VARIANT
silver = bronze.select(
    col("event_key"),
    col("payload:user_id").cast("int").alias("user_id"),
    col("payload:action").cast("string").alias("action"),
    col("payload:metadata.session_id").cast("string").alias("session_id"),
    col("kafka_ts"),
    current_timestamp().alias("processed_at")
).filter(
    col("user_id").isNotNull()  # ANSI mode makes this explicit
)

# Gold: write to Delta Lake with schema enforcement
query = silver.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "s3a://checkpoints/user-events/") \
    .option("mergeSchema", "true") \
    .toTable("gold.user_events")

query.awaitTermination()

このパイプラインはSpark 4の3つの機能を組み合わせています。柔軟なペイロードパース用のVARIANT、厳格な型安全性のためのANSIモード、そして継続的インジェスト用のStructured Streamingです。メダリオンアーキテクチャ(ブロンズ/シルバー/ゴールド)はSparkの読み取り-変換-書き込みモデルにきれいにマッピングされます。

まとめ

  • ANSIモードによりデフォルトのSQL動作が変更されます。Spark 3.xからアップグレードする前に、すべてのCASTと数値操作を監査してください
  • VARIANTはJSON文字列カラムを最適化バイナリ形式に置き換え、ネストされたフィールドアクセスで最大8倍高速化します
  • Spark 4.1のリアルタイムモードは、単一の設定変更で有効化されるステートレスストリーミングで1桁ミリ秒のレイテンシを達成します
  • Spark Connectと1.5MBのpyspark-clientは、ノートブックとCIをクラスタランタイムから分離し、デプロイを簡素化します
  • transformWithStateは複合状態型とTTL有効期限を提供し、制限のあったmapGroupsWithState APIを置き換えます
  • Spark Declarative Pipelinesはパイプライン内の依存関係管理を処理し、パイプライン間のスケジューリングにはAirflowなどのオーケストレーターを補完します
  • 面接準備では、ANSIモード移行、VARIANT対JSON格納、RTM対マイクロバッチのトレードオフ、Spark Connectアーキテクチャに焦点を当てるべきです

今すぐ練習を始めましょう!

面接シミュレーターと技術テストで知識をテストしましょう。

タグ

#apache-spark
#data-engineering
#structured-streaming
#interview
#pyspark

共有

関連記事