Apache Spark met Python: datapijplijnen stap voor stap bouwen
Een praktische PySpark-tutorial over DataFrame-operaties, de opbouw van ETL-pijplijnen en de functies van Spark 4.0. Met productie-klare codevoorbeelden voor data engineers die zich voorbereiden op technische interviews.

Apache Spark blijft in 2026 de dominante gedistribueerde verwerkingsengine voor grootschalige datapijplijnen. Samen met PySpark biedt het een Python-native API die zowel batch- als streamingworkloads aankan op clusters van elke omvang zonder performance in te leveren.
Deze tutorial doorloopt het bouwen van een volledige ETL-datapijplijn met PySpark, van ruwe ingestie tot schone output, aan de hand van Spark 4.0-functies zoals de Python Data Source API en native DataFrame-plotting.
PySpark DataFrames zijn immutable. Elke transformatie geeft een nieuw DataFrame terug en laat het origineel ongemoeid. Dit ontwerp maakt Sparks lazy evaluation mogelijk: transformaties worden pas uitgevoerd wanneer een actie (.show(), .write() of .collect()) het rekenplan activeert.
Een PySpark 4.0-omgeving opzetten
Voordat er ook maar één pijplijn wordt gebouwd, vereist de Spark-sessie de juiste configuratie. Spark 4.0 zet ANSI-modus standaard aan, waardoor strengere SQL-semantiek geldt — numerieke overflows gooien nu uitzonderingen in plaats van stil door te lopen.
# spark_setup.py
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("ETLPipeline")
.config("spark.sql.adaptive.enabled", "true") # AQE for runtime optimization
.config("spark.sql.shuffle.partitions", "200") # Tune based on cluster size
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
)
# Verify Spark version
print(spark.version) # 4.0.0Adaptive Query Execution (AQE), hier ingeschakeld, optimaliseert dynamisch shuffle-partities en joinstrategieën tijdens runtime. Spark 4.0 levert 20 tot 50% snelheidswinst op typische ETL-workloads vergeleken met Spark 3.x, grotendeels door verbeteringen aan AQE en de Catalyst-optimizer.
Ruwe data lezen met de DataFrame API
PySpark DataFrames verbergen gedistribueerde berekeningen achter een vertrouwde tabulaire API. Het lezen uit diverse bronnen — CSV, Parquet, JSON, databases — volgt een consistent patroon.
# read_sources.py
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# Define schema explicitly — avoids slow schema inference on large files
order_schema = StructType([
StructField("order_id", StringType(), nullable=False),
StructField("customer_id", StringType(), nullable=False),
StructField("product_code", StringType(), nullable=True),
StructField("amount", DoubleType(), nullable=True),
StructField("order_date", TimestampType(), nullable=True),
StructField("region", StringType(), nullable=True),
])
# Read CSV with explicit schema
raw_orders = (
spark.read
.schema(order_schema) # Skip inference, enforce types
.option("header", "true") # First row contains column names
.option("mode", "DROPMALFORMED") # Skip rows that don't match schema
.csv("s3a://data-lake/raw/orders/")
)
# Read Parquet — schema is embedded in the file format
products = spark.read.parquet("s3a://data-lake/raw/products/")
raw_orders.printSchema()
raw_orders.show(5, truncate=False)Een expliciete schemadefinitie is een productie-best practice. Schema-inferentie vereist een volledige scan van de databron, wat duur is op grote datasets en types verkeerd kan raden (postcodes bijvoorbeeld geïnterpreteerd als integers).
DataFrames opschonen en transformeren
Ruwe data moet altijd worden opgeschoond. PySpark-transformaties ketenen natuurlijk via de DataFrame API en omdat DataFrames immutable zijn, levert elke stap een nieuw DataFrame op zonder het origineel te wijzigen.
# clean_orders.py
from pyspark.sql import functions as F
cleaned_orders = (
raw_orders
.filter(F.col("order_id").isNotNull()) # Remove null primary keys
.filter(F.col("amount") > 0) # Filter invalid amounts
.withColumn("customer_id", F.trim(F.upper(F.col("customer_id")))) # Normalize IDs
.withColumn("order_date", F.to_date(F.col("order_date"))) # Cast to date type
.withColumn("year_month", # Extract partition key
F.date_format(F.col("order_date"), "yyyy-MM"))
.dropDuplicates(["order_id"]) # Deduplicate by order ID
)
print(f"Raw: {raw_orders.count()} rows -> Cleaned: {cleaned_orders.count()} rows")Volgorde telt in transformatieketens. customer_id normaliseren vóór deduplicatie zorgt voor consistente matching. year_month extraheren na het parsen van de datum voorkomt null-waarden in de partitiekolom.
Een .collect()-aanroep haalt het volledige gedistribueerde DataFrame naar het geheugen van de driver. Op datasets die het RAM van de driver overschrijden, veroorzaakt dit een OutOfMemoryError. Gebruik in plaats daarvan .show(), .take() of .toPandas() op vooraf gefilterde subsets.
Joinen en aggregeren over meerdere bronnen
Echte pijplijnen combineren data uit meerdere bronnen. PySpark ondersteunt alle standaard join-types, en AQE in Spark 4.0 kiest automatisch broadcast joins voor kleine tabellen zonder handmatige hints.
# enrich_orders.py
# Join orders with product catalog
enriched = (
cleaned_orders
.join(products, on="product_code", how="left") # Keep all orders, even unmatched
.select(
"order_id",
"customer_id",
"product_code",
F.col("name").alias("product_name"), # Rename for clarity
"amount",
F.col("category"), # From products table
"order_date",
"year_month",
)
)
# Aggregate: monthly revenue per product category
monthly_revenue = (
enriched
.groupBy("year_month", "category")
.agg(
F.sum("amount").alias("total_revenue"), # Sum all order amounts
F.countDistinct("order_id").alias("order_count"), # Unique orders
F.avg("amount").alias("avg_order_value"), # Average basket size
)
.orderBy(F.desc("total_revenue"))
)
monthly_revenue.show(10)De left join behoudt alle orders, ook wanneer een productcode geen match heeft in de catalogus — een veelvoorkomend scenario tijdens datamigraties of wanneer de productcatalogus achterloopt op het transactiesysteem.
Klaar om je Data Engineering gesprekken te halen?
Oefen met onze interactieve simulatoren, flashcards en technische tests.
Geoptimaliseerde output schrijven met partitionering
De laatste stap in een ETL-pijplijn schrijft de getransformeerde data naar een doelopslag. De output partitioneren op veelgebruikte query-kolommen verkort leessnelheden drastisch voor downstream consumenten.
# write_output.py
# Write enriched data partitioned by year_month
(
enriched
.repartition("year_month") # Align partitions with output
.write
.mode("overwrite") # Replace existing partition data
.partitionBy("year_month") # Physical directory partitioning
.parquet("s3a://data-lake/curated/enriched_orders/")
)
# Write aggregated metrics as a single compact file
(
monthly_revenue
.coalesce(1) # Single output file for small results
.write
.mode("overwrite")
.parquet("s3a://data-lake/curated/monthly_revenue/")
)
print("Pipeline complete — output written to curated layer").repartition() gebruiken vóór .partitionBy() zorgt dat elke fysieke partitie een redelijk aantal bestanden bevat. Zonder deze stap kan Spark honderden minuscule bestanden per partitie produceren — een klassiek anti-pattern dat het "small files problem" heet en de leesperformance op HDFS en object stores aantast.
Spark 4.0: de Python Data Source API voor maatwerkconnectors
Spark 4.0 introduceert de Python Data Source API en maakt daarmee een einde aan de voormalige verplichting om connectors in Java of Scala te schrijven. Dit vereenvoudigt de integratie met propriëtaire systemen, REST-API's of niche-bestandsformaten.
# custom_source.py
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
class APIDataSource(DataSource):
"""Custom data source reading from an internal REST API."""
@classmethod
def name(cls) -> str:
return "internal_api" # Registered source name
def schema(self) -> StructType:
return StructType([ # Define output schema
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("status", StringType()),
])
def reader(self, schema: StructType) -> "APIDataSourceReader":
return APIDataSourceReader(self.options) # Pass options to reader
class APIDataSourceReader(DataSourceReader):
def __init__(self, options):
self.endpoint = options.get("endpoint", "") # API endpoint URL
def read(self, partition):
import requests # Import inside read for serialization
response = requests.get(self.endpoint)
for record in response.json():
yield (record["id"], record["name"], record["status"])
# Register and use the custom source
spark.dataSource.register(APIDataSource)
api_df = spark.read.format("internal_api").option("endpoint", "https://api.internal/users").load()
api_df.show()De Python Data Source API ondersteunt zowel batch- als streaminglezen. Voor productiegebruik verbetert het toevoegen van foutafhandeling in read() en het implementeren van parallelisme op partitieniveau de doorvoer aanzienlijk.
Spark Connect ontkoppelt clientapplicaties van het Spark-cluster. De lichtgewicht PySpark-client (pyspark-client) weegt slechts 1,5 MB tegenover 355 MB voor het volledige PySpark-pakket. Hiermee kunnen Spark-jobs vanuit elke IDE of notebook worden gedraaid zonder lokale Spark-installatie.
Performance-tuningchecklist voor PySpark-pijplijnen
Een werkende pijplijn bouwen is slechts een deel van de uitdaging. Hem op schaal performant maken vraagt aandacht voor partitionering, caching en joinstrategieën.
| Techniek | Wanneer toepassen | Impact |
|-----------|---------------|--------|
| Expliciete schemas | Bij elke leesoperatie | Elimineert de schema-inferentiescan |
| Partition pruning | Filteren op gepartitioneerde datasets | Slaat irrelevante partities volledig over |
| Broadcast joins | Kleine tabel (< 10 MB) gekoppeld aan grote tabel | Vermijdt kostbare shuffle |
| Caching | DataFrame die in meerdere acties wordt hergebruikt | Voorkomt herberekening |
| Coalesce vs Repartition | Aantal partities verminderen | coalesce vermijdt volledige shuffle |
| AQE | Altijd (standaard in Spark 4.0) | Runtime-optimalisatie van joins en shuffles |
De pijplijn-performance monitoren via de Spark UI blijft essentieel. De DAG-visualisatie onthult shuffle-grenzen, en het SQL-tabblad toont het fysieke plan dat de Catalyst-optimizer koos.
Conclusie
- PySpark 4.0 biedt een volwassen, Python-eerste API voor het bouwen van ETL-pijplijnen op elke schaal, met standaard ingeschakelde ANSI SQL-compliance
- Expliciete schemadefinitie, juiste partitionering en AQE elimineren de meest voorkomende performance-bottlenecks in productie
- De nieuwe Python Data Source API haalt de Java/Scala-barrière weg voor maatwerkconnectors — REST-API's, propriëtaire formaten en streamingbronnen zijn nu in puur Python te integreren
- Transformatievolgorde telt: identifiers normaliseren vóór deduplicatie, datums parsen vóór het extraheren van partitiesleutels
- De Data Engineering interviewvragen behandelen vaak Spark-internals zoals lazy evaluation, shuffle-mechanica en partitiestrategieën — deze pijplijnfundamenten beheersen is directe voorbereiding op dat soort gesprekken
- De ETL- en ELT-patronen zijn het verkennen waard voor een diepere behandeling van architecturale pijplijnkeuzes
Begin met oefenen!
Test je kennis met onze gespreksimulatoren en technische tests.
Tags
Delen
Gerelateerde artikelen

ETL vs ELT in 2026: Architectuur van datapipelines uitgelegd
ETL vs ELT vergelijking voor moderne datapipelines. Architectuurverschillen, prestatie-afwegingen en wanneer welke aanpak te gebruiken met Snowflake, BigQuery en dbt in 2026.

De 25 belangrijkste Data Engineering sollicitatievragen in 2026
Een praktische gids met de 25 meest gestelde sollicitatievragen voor data engineers in 2026. Van SQL-optimalisatie en pipeline-architectuur tot system design en datakwaliteit, met codevoorbeelden en uitgebreide antwoorden.