Apache Spark mit Python: Datenpipelines Schritt fuer Schritt aufbauen
Praxisleitfaden zum Aufbau vollstaendiger ETL-Datenpipelines mit PySpark 4.0 -- von der Rohdatenaufnahme ueber Bereinigung und Transformation bis zur optimierten Ausgabe mit Partitionierung, inklusive der neuen Python Data Source API und Performance-Tuning-Checkliste.

Apache Spark ist auch 2026 die fuehrende verteilte Verarbeitungsengine fuer grosse Datenpipelines. Ob Batch-Verarbeitung von Terabytes historischer Transaktionsdaten oder Echtzeit-Streaming von Klickdaten -- Spark verarbeitet beide Szenarien mit derselben API. In Kombination mit PySpark steht eine Python-native Schnittstelle zur Verfuegung, die Workloads ueber Cluster beliebiger Groesse verarbeitet, ohne Abstriche bei der Performance.
Dieses Tutorial zeigt den Aufbau einer vollstaendigen ETL-Datenpipeline mit PySpark, von der Rohdatenaufnahme ueber Bereinigung und Anreicherung bis zur optimierten Ausgabe. Dabei kommen Spark-4.0-Features wie die Python Data Source API und native DataFrame-Visualisierung zum Einsatz. Jeder Schritt enthaelt ausfuehrbaren Code, der sich direkt auf reale Produktionsszenarien uebertragen laesst.
Wer sich auf ein Data-Engineering-Interview vorbereitet, findet hier genau die praktischen Grundlagen, die in technischen Gespraechen erwartet werden: Lazy Evaluation, Shuffle-Mechanismen, Partitionierungsstrategien und die Arbeit mit dem DataFrame-API. Das Verstaendnis dieser Konzepte unterscheidet Kandidaten, die Spark lediglich nutzen, von solchen, die es beherrschen.
PySpark DataFrames sind unveraenderlich (immutable). Jede Transformation gibt einen neuen DataFrame zurueck und laesst den urspruenglichen unberuehrt. Dieses Design ermoeglicht Sparks Lazy Evaluation: Transformationen werden erst ausgefuehrt, wenn eine Aktion (wie .show(), .write() oder .collect()) den Berechnungsplan ausloest.
PySpark-4.0-Umgebung einrichten
Bevor eine Pipeline gebaut werden kann, braucht die Spark-Session eine korrekte Konfiguration. Spark 4.0 aktiviert standardmaessig den ANSI-Modus, der striktere SQL-Semantik durchsetzt -- numerische Ueberlauefe werfen jetzt Exceptions statt stillschweigend umzubrechen. Dieses Verhalten verhindert subtile Datenfehler, die in aelteren Spark-Versionen unbemerkt durch die Pipeline wanderten.
# spark_setup.py
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("ETLPipeline")
.config("spark.sql.adaptive.enabled", "true") # AQE for runtime optimization
.config("spark.sql.shuffle.partitions", "200") # Tune based on cluster size
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
)
# Verify Spark version
print(spark.version) # 4.0.0Adaptive Query Execution (AQE), hier aktiviert, optimiert Shuffle-Partitionen und Join-Strategien dynamisch zur Laufzeit. Anstatt statisch 200 Shuffle-Partitionen zu verwenden, passt AQE die Anzahl basierend auf der tatsaechlichen Datengroesse an. Spark 4.0 liefert bei typischen ETL-Workloads 20--50 % Geschwindigkeitszuwaechse gegenueber Spark 3.x, groesstenteils durch AQE-Verbesserungen und den Catalyst-Optimizer. Der KryoSerializer reduziert den Serialisierungs-Overhead gegenueber dem Standard-Java-Serializer erheblich, was sich insbesondere bei datenintensiven Shuffle-Operationen bemerkbar macht.
Rohdaten mit der DataFrame-API einlesen
PySpark DataFrames abstrahieren die verteilte Berechnung hinter einer vertrauten tabellarischen API. Das Einlesen aus verschiedenen Quellen -- CSV, Parquet, JSON, Datenbanken -- folgt einem einheitlichen Muster. Entscheidend ist dabei die Wahl zwischen expliziter Schemadefinition und automatischer Schema-Inferenz.
# read_sources.py
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# Define schema explicitly — avoids slow schema inference on large files
order_schema = StructType([
StructField("order_id", StringType(), nullable=False),
StructField("customer_id", StringType(), nullable=False),
StructField("product_code", StringType(), nullable=True),
StructField("amount", DoubleType(), nullable=True),
StructField("order_date", TimestampType(), nullable=True),
StructField("region", StringType(), nullable=True),
])
# Read CSV with explicit schema
raw_orders = (
spark.read
.schema(order_schema) # Skip inference, enforce types
.option("header", "true") # First row contains column names
.option("mode", "DROPMALFORMED") # Skip rows that don't match schema
.csv("s3a://data-lake/raw/orders/")
)
# Read Parquet — schema is embedded in the file format
products = spark.read.parquet("s3a://data-lake/raw/products/")
raw_orders.printSchema()
raw_orders.show(5, truncate=False)Die explizite Schemadefinition gilt als Best Practice fuer Produktionsumgebungen. Schema-Inferenz erfordert einen vollstaendigen Scan der Datenquelle, was bei grossen Datenmengen teuer ist und zu falschen Typzuordnungen fuehren kann -- etwa wenn Postleitzahlen als Integer interpretiert werden oder fuehrende Nullen in Produktcodes verloren gehen. Bei Parquet-Dateien entfaellt dieses Problem, da das Schema direkt im Dateiformat eingebettet ist. Fuer CSV-Dateien in Produktionspipelines sollte das Schema immer explizit definiert werden.
Der Modus DROPMALFORMED verwirft Zeilen, die nicht zum definierten Schema passen. Alternativ schreibt der Modus PERMISSIVE fehlerhafte Zeilen in eine eigene Spalte, was die spaetere Fehleranalyse erleichtert.
DataFrames bereinigen und transformieren
Rohdaten muessen immer bereinigt werden. Fehlende Werte, inkonsistente Formate und Duplikate sind in jedem realen Datensatz anzutreffen. PySpark-Transformationen lassen sich natuerlich ueber die DataFrame-API verketten, und da DataFrames unveraenderlich sind, erzeugt jeder Schritt einen neuen DataFrame, ohne den urspruenglichen zu veraendern. Dieses funktionale Muster erleichtert das Debugging erheblich, da jeder Zwischenschritt einzeln inspiziert werden kann.
# clean_orders.py
from pyspark.sql import functions as F
cleaned_orders = (
raw_orders
.filter(F.col("order_id").isNotNull()) # Remove null primary keys
.filter(F.col("amount") > 0) # Filter invalid amounts
.withColumn("customer_id", F.trim(F.upper(F.col("customer_id")))) # Normalize IDs
.withColumn("order_date", F.to_date(F.col("order_date"))) # Cast to date type
.withColumn("year_month", # Extract partition key
F.date_format(F.col("order_date"), "yyyy-MM"))
.dropDuplicates(["order_id"]) # Deduplicate by order ID
)
print(f"Raw: {raw_orders.count()} rows -> Cleaned: {cleaned_orders.count()} rows")Die Reihenfolge der Transformationen ist entscheidend. Die Normalisierung von customer_id vor der Deduplizierung stellt ein konsistentes Matching sicher -- ohne diese Normalisierung wuerden "CUST-001" und "cust-001" als separate Eintraege behandelt. Das Extrahieren von year_month nach dem Parsen des Datums vermeidet Nullwerte in der Partitionsspalte, was spaeter beim Schreiben partitionierter Daten zu Problemen fuehren wuerde.
Die Ausgabe der Zeilenanzahl vor und nach der Bereinigung ist kein reiner Debugging-Schritt: In Produktionspipelines dient diese Differenz als Datenqualitaetsmetrik, die in Monitoring-Systeme eingespeist wird.
Der Aufruf von .collect() laedt den gesamten verteilten DataFrame in den Arbeitsspeicher des Drivers. Bei Datensaetzen, die den verfuegbaren RAM uebersteigen, fuehrt das zu einem OutOfMemoryError. Stattdessen sollten .show(), .take() oder .toPandas() auf vorgefilterte Teilmengen angewendet werden.
Joins und Aggregationen ueber mehrere Quellen
Reale Pipelines kombinieren Daten aus verschiedenen Quellen. Eine isolierte Bestelltabelle hat begrenzten analytischen Wert -- erst durch die Anreicherung mit Produktkatalog, Kundenstammdaten oder geografischen Informationen entsteht ein vollstaendiges Bild. PySpark unterstuetzt alle gaengigen Join-Typen (inner, left, right, full outer, cross, semi, anti), und Spark 4.0 waehlt mit AQE automatisch Broadcast-Joins fuer kleine Tabellen, ohne dass manuelle Hints erforderlich sind.
# enrich_orders.py
# Join orders with product catalog
enriched = (
cleaned_orders
.join(products, on="product_code", how="left") # Keep all orders, even unmatched
.select(
"order_id",
"customer_id",
"product_code",
F.col("name").alias("product_name"), # Rename for clarity
"amount",
F.col("category"), # From products table
"order_date",
"year_month",
)
)
# Aggregate: monthly revenue per product category
monthly_revenue = (
enriched
.groupBy("year_month", "category")
.agg(
F.sum("amount").alias("total_revenue"), # Sum all order amounts
F.countDistinct("order_id").alias("order_count"), # Unique orders
F.avg("amount").alias("avg_order_value"), # Average basket size
)
.orderBy(F.desc("total_revenue"))
)
monthly_revenue.show(10)Der left-Join bewahrt alle Bestellungen, auch wenn ein Produktcode keinen Treffer im Katalog hat -- ein haeufiges Szenario bei Datenmigrationen oder wenn Produktkataloge den Transaktionssystemen hinterherhinken. Die explizite Spaltenauswahl ueber .select() nach dem Join ist eine weitere Best Practice: Sie verhindert die unbeabsichtigte Weitergabe von Spalten mit identischen Namen aus beiden Tabellen, was spaeter zu mehrdeutigen Referenzen fuehren wuerde.
Die Aggregation mit countDistinct statt count stellt sicher, dass doppelte Bestellungs-IDs (falls durch den Join entstanden) das Ergebnis nicht verfaelschen.
Bereit für deine Data Engineering-Interviews?
Übe mit unseren interaktiven Simulatoren, Flashcards und technischen Tests.
Optimierte Ausgabe mit Partitionierung schreiben
Der letzte Schritt einer ETL-Pipeline schreibt die transformierten Daten in eine Zielspeicherschicht. Die Partitionierung der Ausgabe nach haeufig abgefragten Spalten reduziert die Lesezeiten fuer nachgelagerte Konsumenten drastisch. Wenn ein BI-Tool nur Daten fuer einen bestimmten Monat abfragt, muss Spark dank Partition Pruning nur das entsprechende Verzeichnis lesen und nicht den gesamten Datensatz scannen.
# write_output.py
# Write enriched data partitioned by year_month
(
enriched
.repartition("year_month") # Align partitions with output
.write
.mode("overwrite") # Replace existing partition data
.partitionBy("year_month") # Physical directory partitioning
.parquet("s3a://data-lake/curated/enriched_orders/")
)
# Write aggregated metrics as a single compact file
(
monthly_revenue
.coalesce(1) # Single output file for small results
.write
.mode("overwrite")
.parquet("s3a://data-lake/curated/monthly_revenue/")
)
print("Pipeline complete — output written to curated layer")Die Verwendung von .repartition() vor .partitionBy() stellt sicher, dass jede physische Partition eine angemessene Anzahl von Dateien enthaelt. Ohne diesen Schritt kann Spark Hunderte winziger Dateien pro Partition erzeugen -- ein verbreitetes Anti-Pattern, bekannt als das "Small-Files-Problem", das die Leseleistung auf HDFS und Object Stores erheblich verschlechtert. Bei kleinen Ergebnismengen wie den aggregierten Monatskennzahlen erzeugt .coalesce(1) eine einzelne kompakte Datei, was den Overhead beim Lesen minimiert.
Der overwrite-Modus ersetzt die gesamte Partition bei jedem Lauf. Fuer inkrementelle Pipelines, die nur neue Daten hinzufuegen, eignet sich stattdessen der append-Modus.
Spark 4.0: Python Data Source API fuer eigene Konnektoren
Spark 4.0 fuehrt die Python Data Source API ein und beseitigt damit die bisherige Anforderung, eigene Konnektoren in Java oder Scala zu schreiben. Fuer Data-Engineering-Teams, deren Hauptsprache Python ist, bedeutet das einen erheblichen Produktivitaetsgewinn: Interne REST-APIs, proprietaere Dateiformate oder Nischenquellen lassen sich jetzt direkt in die Spark-Pipeline integrieren, ohne den Technologie-Stack zu wechseln.
# custom_source.py
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
class APIDataSource(DataSource):
"""Custom data source reading from an internal REST API."""
@classmethod
def name(cls) -> str:
return "internal_api" # Registered source name
def schema(self) -> StructType:
return StructType([ # Define output schema
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("status", StringType()),
])
def reader(self, schema: StructType) -> "APIDataSourceReader":
return APIDataSourceReader(self.options) # Pass options to reader
class APIDataSourceReader(DataSourceReader):
def __init__(self, options):
self.endpoint = options.get("endpoint", "") # API endpoint URL
def read(self, partition):
import requests # Import inside read for serialization
response = requests.get(self.endpoint)
for record in response.json():
yield (record["id"], record["name"], record["status"])
# Register and use the custom source
spark.dataSource.register(APIDataSource)
api_df = spark.read.format("internal_api").option("endpoint", "https://api.internal/users").load()
api_df.show()Die Python Data Source API unterstuetzt sowohl Batch- als auch Streaming-Reads. Fuer den Produktionseinsatz verbessern die Ergaenzung von Fehlerbehandlung innerhalb von read() und die Implementierung von Partition-Level-Parallelismus den Durchsatz erheblich. Dabei ist zu beachten, dass der import-Befehl innerhalb der read()-Methode stehen muss, da die Methode auf Worker-Nodes ausgefuehrt wird und nur serialisierbare Objekte ueber die Netzwerkgrenze transportiert werden koennen.
Spark Connect entkoppelt Client-Anwendungen vom Spark-Cluster. Der leichtgewichtige PySpark-Client (pyspark-client) ist nur 1,5 MB gross -- im Vergleich zum vollstaendigen PySpark-Paket mit 355 MB. Das ermoeglicht die Ausfuehrung von Spark-Jobs aus jeder IDE oder jedem Notebook, ohne eine lokale Spark-Installation.
Performance-Tuning-Checkliste fuer PySpark-Pipelines
Eine funktionsfaehige Pipeline zu bauen ist nur ein Teil der Herausforderung. Damit sie auch im grossen Massstab performant laeuft, muessen Partitionierung, Caching und Join-Strategien gezielt abgestimmt werden. Die folgende Tabelle fasst die wichtigsten Optimierungstechniken zusammen, die in Data-Engineering-Interviews regelmaessig abgefragt werden.
| Technik | Anwendungsfall | Auswirkung |
|---------|---------------|------------|
| Explizite Schemata | Bei jeder Leseoperation | Eliminiert den Schema-Inferenz-Scan |
| Partition Pruning | Filtern partitionierter Datensaetze | Ueberspringt irrelevante Datenpartitionen vollstaendig |
| Broadcast Joins | Kleine Tabelle (< 10 MB) mit grosser Tabelle | Vermeidet teuren Shuffle |
| Caching | DataFrame wird in mehreren Aktionen wiederverwendet | Verhindert Neuberechnung |
| Coalesce vs. Repartition | Reduzierung der Partitionsanzahl | coalesce vermeidet vollstaendigen Shuffle |
| AQE | Immer (Standard in Spark 4.0) | Laufzeitoptimierung von Joins und Shuffles |
Das Monitoring der Pipeline-Performance ueber die Spark-UI bleibt unverzichtbar. Die DAG-Visualisierung zeigt Shuffle-Grenzen auf, und der SQL-Tab zeigt den vom Catalyst-Optimizer gewaehlten physischen Plan. Besonderes Augenmerk verdient die Spalte "Shuffle Read" in der Stages-Ansicht: Hohe Werte deuten auf fehlende Partitionierung oder ueberdimensionierte Shuffle-Operationen hin. Die Kombination aus expliziten Schemata, gezieltem Caching und AQE loest erfahrungsgemaess ueber 80 % der Performance-Probleme in Produktionspipelines.
Fazit
- PySpark 4.0 bietet eine ausgereifte, Python-first API fuer den Aufbau von ETL-Pipelines in jeder Groessenordnung, mit standardmaessig aktivierter ANSI-SQL-Kompatibilitaet
- Explizite Schemadefinition, korrekte Partitionierung und AQE beseitigen die haeufigsten Performance-Engpaesse in Produktionspipelines
- Die neue Python Data Source API entfernt die Java/Scala-Barriere fuer eigene Konnektoren -- REST-APIs, proprietaere Formate und Streaming-Quellen lassen sich komplett in reinem Python integrieren
- Die Reihenfolge der Transformationen ist wichtig: Identifier normalisieren vor der Deduplizierung, Datumsangaben parsen vor der Extraktion von Partitionsschluesseln
- Die Data-Engineering-Interviewfragen behandeln haeufig Spark-Interna wie Lazy Evaluation, Shuffle-Mechanismen und Partitionierungsstrategien -- das Verstaendnis dieser Pipeline-Grundlagen bereitet direkt auf diese Gespraeche vor
- Weitere Informationen zu ETL- und ELT-Patterns vertiefen Architekturentscheidungen fuer Pipelines
Fang an zu üben!
Teste dein Wissen mit unseren Interview-Simulatoren und technischen Tests.
Tags
Teilen
Verwandte Artikel

ETL vs ELT 2026: Datenpipeline-Architektur im Vergleich
ETL vs ELT Vergleich für moderne Datenpipelines. Architekturunterschiede, Leistungs-Kompromisse und wann welcher Ansatz mit Snowflake, BigQuery und dbt in 2026 sinnvoll ist.

Die 25 wichtigsten Data-Engineering-Interviewfragen 2026 -- mit Antworten und Code
Die häufigsten Data-Engineering-Interviewfragen 2026: SQL, Pipelines, ETL/ELT, Spark, Kafka, Datenmodellierung und System Design mit ausführlichen Antworten und Codebeispielen.