Apache Spark 4 in 2026: New Features, Structured Streaming and Interview Questions

A comprehensive guide to Apache Spark 4.x covering ANSI mode, VARIANT type, Real-Time Mode streaming, Spark Connect, and common data engineering interview questions with code examples.

Apache Spark 4 data engineering structured streaming pipeline illustration

Apache Spark 4 represents the most significant overhaul of the Spark engine since version 3.0. Released in mid-2025 with rapid iteration through 4.1 and the upcoming 4.2, the 4.x line reshapes how data engineers build pipelines, run streaming workloads, and handle SQL at scale. This guide breaks down the critical changes, demonstrates them with PySpark and Scala code, and covers the interview questions that hiring teams ask most.

Spark 4 Release Timeline

Spark 4.0.0 shipped in May 2025 with 5,100+ resolved tickets from 390 contributors. Spark 4.1.0 followed in December 2025, adding Real-Time Mode streaming and Declarative Pipelines. Spark 4.2 preview builds started in February 2026.

ANSI SQL Mode Now On by Default in Spark 4

Spark 4.0 flips spark.sql.ansi.enabled to true. Every arithmetic overflow, invalid cast, and division-by-zero that previously returned NULL or silently truncated now throws an exception. The goal: strict SQL standard compliance matching PostgreSQL and Oracle behavior.

The practical impact hits ETL pipelines hardest. Queries that relied on implicit coercion or silent NULL propagation will fail at runtime until explicitly handled.

python
# ansi_mode_migration.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, try_cast

spark = SparkSession.builder.appName("ANSIMigration").getOrCreate()

# This now throws ArithmeticException in Spark 4 ANSI mode
# SELECT CAST('abc' AS INT)  -- runtime error

# Safe migration: use TRY_CAST to preserve NULL behavior
df = spark.sql("""
    SELECT
        TRY_CAST(raw_value AS INT) AS parsed_value,
        TRY_CAST(amount AS DECIMAL(10,2)) AS safe_amount
    FROM raw_events
    WHERE TRY_CAST(raw_value AS INT) IS NOT NULL
""")

# Alternatively, wrap arithmetic in try_divide / try_add
result = df.withColumn(
    "ratio",
    col("safe_amount") / col("parsed_value")  # throws on zero
)

Migration checklist: audit every CAST call, replace with TRY_CAST where NULL-safe behavior is expected, and add explicit overflow checks on numeric columns. The Spark SQL migration guide documents every behavioral change.

VARIANT Data Type for Semi-Structured Data

Spark 4.0 adds a native VARIANT column type designed for JSON-like semi-structured data. Unlike storing raw JSON strings and parsing at query time, VARIANT uses an optimized binary format with column shredding that delivers up to 8x faster reads on nested field access.

VariantExample.scalascala
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("VariantDemo").getOrCreate()
import spark.implicits._

// Create a table with VARIANT column
spark.sql("""
    CREATE TABLE events (
        event_id BIGINT,
        timestamp TIMESTAMP,
        payload VARIANT
    ) USING PARQUET
""")

// Insert JSON data — automatically converted to VARIANT
spark.sql("""
    INSERT INTO events VALUES
    (1, current_timestamp(),
     PARSE_JSON('{"user_id": 42, "action": "click", "metadata": {"page": "/home", "duration_ms": 320}}')),
    (2, current_timestamp(),
     PARSE_JSON('{"user_id": 87, "action": "purchase", "metadata": {"item_id": "SKU-9001", "amount": 49.99}}'))
""")

// Query nested fields with dot notation — uses shredding for fast access
val clicks = spark.sql("""
    SELECT
        event_id,
        payload:user_id::INT AS user_id,
        payload:action::STRING AS action,
        payload:metadata.page::STRING AS page
    FROM events
    WHERE payload:action::STRING = 'click'
""")
clicks.show()

VARIANT eliminates the need for schema-on-read workarounds. In data lake architectures where event schemas evolve frequently, a single VARIANT column absorbs schema changes without DDL migrations. Spark 4.1 extended VARIANT with shredding at the Parquet level, further reducing IO for selective field access.

Real-Time Mode Streaming in Spark 4.1

Structured Streaming in Spark 4.1 introduces Real-Time Mode (RTM), the first official support for sub-second latency in Spark. For stateless workloads, latency drops to single-digit milliseconds, directly competing with Apache Flink on its home turf.

Three architectural changes make this possible: continuous data flow (no micro-batch boundaries), pipeline scheduling (overlap read/compute/write stages), and streaming shuffle (state stored in memory rather than checkpointed between batches).

python
# real_time_streaming.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

spark = SparkSession.builder \
    .appName("RTMDemo") \
    .config("spark.sql.streaming.mode", "realtime") \
    .getOrCreate()

schema = StructType([
    StructField("sensor_id", StringType()),
    StructField("temperature", DoubleType()),
    StructField("event_time", TimestampType())
])

# Read from Kafka with Real-Time Mode enabled
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "sensor-readings") \
    .option("startingOffsets", "latest") \
    .load()

parsed = raw_stream \
    .select(from_json(col("value").cast("string"), schema).alias("data")) \
    .select("data.*")

# Write to Kafka sink — sub-second end-to-end
query = parsed \
    .filter(col("temperature") > 80.0) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("topic", "alerts") \
    .option("checkpointLocation", "/tmp/checkpoints/alerts") \
    .start()

query.awaitTermination()

Activating RTM requires a single config change: spark.sql.streaming.mode = realtime. No code rewrite needed. Spark 4.1 starts with stateless, single-stage Scala and PySpark queries using Kafka sources and Kafka/Foreach sinks. Stateful RTM support is expected in Spark 4.2.

Spark Connect and the Lightweight PySpark Client

Spark 4.0 ships a standalone pyspark-client package weighing just 1.5 MB, compared to the full 300+ MB PySpark distribution. This thin client communicates with a remote Spark cluster over gRPC through Spark Connect, enabling notebook environments and CI pipelines to run Spark jobs without bundling the full runtime.

python
# spark_connect_client.py
from pyspark.sql import SparkSession

# Connect to remote Spark cluster — no local JVM needed
spark = SparkSession.builder \
    .remote("sc://spark-cluster.internal:15002") \
    .getOrCreate()

# Full DataFrame API available through thin client
df = spark.read.parquet("s3a://data-lake/events/2026/")

aggregated = df.groupBy("country", "event_type") \
    .count() \
    .orderBy("count", ascending=False)

aggregated.show(20)

Spark Connect also supports Java and Swift clients. The spark.api.mode config switches between classic embedded mode and Connect mode without code changes. For data engineering teams, this decouples client environments from cluster upgrades: update the server, and clients keep working.

Ready to ace your Data Engineering interviews?

Practice with our interactive simulators, flashcards, and technical tests.

transformWithState API for Complex Streaming Logic

Spark 4.0 replaces the limited mapGroupsWithState with transformWithState, a flexible API supporting composite state types, TTL-based expiry, and event-driven timers.

SessionTracker.scalascala
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StatefulProcessor
import java.time.Duration

case class UserEvent(userId: String, action: String, timestamp: Long)
case class SessionSummary(userId: String, actionCount: Int, durationMs: Long)

class SessionProcessor extends StatefulProcessor[String, UserEvent, SessionSummary] {

  // ValueState with 30-minute TTL — auto-cleanup of idle sessions
  @transient private var sessionStart: ValueState[Long] = _
  @transient private var actionCount: ValueState[Int] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    sessionStart = getHandle.getValueState[Long]("start", TTLConfig(Duration.ofMinutes(30)))
    actionCount = getHandle.getValueState[Int]("count", TTLConfig(Duration.ofMinutes(30)))
  }

  override def handleInputRows(
      key: String,
      rows: Iterator[UserEvent],
      timerValues: TimerValues
  ): Iterator[SessionSummary] = {
    rows.flatMap { event =>
      if (!sessionStart.exists()) {
        sessionStart.update(event.timestamp)
        actionCount.update(1)
        Iterator.empty
      } else {
        val count = actionCount.get() + 1
        actionCount.update(count)
        // Emit summary every 10 actions
        if (count % 10 == 0) {
          val duration = event.timestamp - sessionStart.get()
          Iterator(SessionSummary(key, count, duration))
        } else Iterator.empty
      }
    }
  }
}

Three state types cover most streaming use cases: ValueState for single values per key, ListState for append-heavy patterns like event logs, and MapState for key-value lookups within a grouping key. TTL-based expiry eliminates manual state cleanup, critical for long-running pipelines where stale state causes memory pressure.

Spark Declarative Pipelines (SDP) in Spark 4.1

Spark 4.1 adds Spark Declarative Pipelines, a framework where engineers define datasets and transformations while Spark handles execution ordering, parallelism, checkpoints, and retries. SDP competes directly with dbt for SQL-heavy transformation workflows, but runs natively inside Spark.

python
# declarative_pipeline.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, current_date

spark = SparkSession.builder.appName("SDP-Demo").getOrCreate()

# Define datasets declaratively — Spark resolves dependencies
@spark.declarative.table("clean_orders")
def clean_orders():
    return spark.read.table("raw_orders") \
        .filter(col("status") != "cancelled") \
        .filter(col("amount") > 0)

@spark.declarative.table("daily_revenue")
def daily_revenue():
    return spark.read.table("clean_orders") \
        .groupBy("order_date", "region") \
        .agg(_sum("amount").alias("total_revenue"))

@spark.declarative.table("revenue_summary")
def revenue_summary():
    return spark.read.table("daily_revenue") \
        .groupBy("region") \
        .agg(_sum("total_revenue").alias("grand_total")) \
        .orderBy("grand_total", ascending=False)

# Execute — Spark builds the DAG, parallelizes, handles retries
spark.declarative.run()

SDP manages the dependency graph automatically. If clean_orders fails, only downstream tables are blocked. Checkpoints persist intermediate results, so recovering from failures skips already-completed stages. For data engineering teams already using Airflow for orchestration, SDP handles intra-pipeline dependencies while Airflow manages inter-pipeline scheduling.

Data Engineering Interview Questions on Spark 4

Hiring teams increasingly test Spark 4 knowledge. Here are the questions that come up most, with concise answers.

Interview Tip

When answering Spark questions, always specify which version the behavior applies to. Spark 4 changed several defaults that contradict Spark 3.x answers, particularly around ANSI mode and streaming semantics.

Q1: What changes when ANSI mode is enabled by default in Spark 4?

Arithmetic overflows, invalid casts, and division by zero throw runtime exceptions instead of returning NULL or silently wrapping. Migration requires replacing CAST with TRY_CAST and adding explicit error handling for numeric operations. The config spark.sql.ansi.enabled controls the behavior.

Q2: Explain the VARIANT data type and when to use it.

VARIANT stores semi-structured data in an optimized binary format with column shredding. Unlike raw JSON strings, VARIANT enables predicate pushdown and selective field reads at the storage layer. Best suited for event data with evolving schemas, API response logging, or any column where structure varies across rows.

Q3: How does Real-Time Mode differ from micro-batch streaming?

Micro-batch processes data in discrete intervals (typically 100ms+). RTM uses continuous data flow with pipeline scheduling, achieving single-digit millisecond latency for stateless workloads. RTM removes batch boundaries entirely: data flows from source through operators to sink without checkpointing between steps. Currently limited to stateless, single-stage queries with Kafka sources.

Q4: What is transformWithState and why replace mapGroupsWithState?

transformWithState supports composite state types (ValueState, ListState, MapState), native TTL-based expiry, and event-driven timers. mapGroupsWithState only supported a single opaque state object with manual timeout handling. The new API reduces boilerplate for session windows, deduplication, and stateful aggregations.

Q5: What problem does Spark Connect solve?

Spark Connect decouples the client from the Spark runtime via gRPC. The pyspark-client package is 1.5 MB versus 300+ MB for full PySpark. Clients can connect to remote clusters without a local JVM, enabling lightweight notebook environments and independent client/server version upgrades.

Common Mistake

Candidates often confuse Spark Connect with Apache Livy. Livy proxies REST calls to a Spark cluster; Spark Connect provides a native client-server protocol with full DataFrame API support and session isolation. Spark 4.0 ships Connect as a first-class feature, not an add-on.

Building a Complete Spark 4 ETL Pipeline

Putting the pieces together: a pipeline that reads from Kafka, parses VARIANT payloads, applies ETL patterns, and writes to a Delta Lake table.

python
# spark4_etl_pipeline.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, current_timestamp, parse_json

spark = SparkSession.builder \
    .appName("Spark4ETL") \
    .config("spark.sql.ansi.enabled", "true") \
    .getOrCreate()

# Bronze: raw ingestion from Kafka
bronze = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "user-events") \
    .load() \
    .select(
        col("key").cast("string").alias("event_key"),
        parse_json(col("value").cast("string")).alias("payload"),
        col("timestamp").alias("kafka_ts")
    )

# Silver: extract typed fields from VARIANT
silver = bronze.select(
    col("event_key"),
    col("payload:user_id").cast("int").alias("user_id"),
    col("payload:action").cast("string").alias("action"),
    col("payload:metadata.session_id").cast("string").alias("session_id"),
    col("kafka_ts"),
    current_timestamp().alias("processed_at")
).filter(
    col("user_id").isNotNull()  # ANSI mode makes this explicit
)

# Gold: write to Delta Lake with schema enforcement
query = silver.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "s3a://checkpoints/user-events/") \
    .option("mergeSchema", "true") \
    .toTable("gold.user_events")

query.awaitTermination()

This pipeline combines three Spark 4 features: VARIANT for flexible payload parsing, ANSI mode for strict type safety, and Structured Streaming for continuous ingestion. The medallion architecture (bronze/silver/gold) maps cleanly to Spark's read-transform-write model.

Conclusion

  • ANSI mode changes default SQL behavior; audit every CAST and numeric operation before upgrading from Spark 3.x
  • VARIANT replaces JSON string columns with an optimized binary format delivering up to 8x faster nested field access
  • Real-Time Mode in Spark 4.1 achieves single-digit millisecond latency for stateless streaming, activated with a single config change
  • Spark Connect and the 1.5 MB pyspark-client decouple notebooks and CI from cluster runtime, simplifying deployment
  • transformWithState provides composite state types and TTL expiry, replacing the limited mapGroupsWithState API
  • Spark Declarative Pipelines handle intra-pipeline dependency management, complementing orchestrators like Airflow for inter-pipeline scheduling
  • Interview prep should focus on ANSI mode migration, VARIANT vs JSON storage, RTM vs micro-batch trade-offs, and Spark Connect architecture

Start practicing!

Test your knowledge with our interview simulators and technical tests.

Tags

#apache-spark
#data-engineering
#structured-streaming
#interview
#pyspark

Share

Related articles