Apache Spark z Pythonem: Budowanie Potoków Danych Krok po Kroku
Praktyczny poradnik PySpark obejmujący operacje na DataFrame, budowę potoków ETL oraz funkcje Spark 4.0. Zawiera gotowe do produkcji przykłady kodu dla inżynierów danych przygotowujących się do rozmów technicznych.

Apache Spark pozostaje dominującym silnikiem przetwarzania rozproszonego dla wielkoskalowych potoków danych w 2026 roku. W połączeniu z PySpark oferuje natywne API Pythona, które obsługuje zarówno przetwarzanie wsadowe, jak i strumieniowe na klastrach dowolnej wielkości bez utraty wydajności.
Ten poradnik przeprowadza przez budowę kompletnego potoku danych ETL w PySpark — od surowych danych wejściowych po oczyszczone wyniki — z wykorzystaniem funkcji Spark 4.0, takich jak Python Data Source API oraz natywne wykresy DataFrame.
DataFrame w PySpark są niemutowalne. Każda transformacja zwraca nowy DataFrame, pozostawiając oryginał bez zmian. Ten projekt umożliwia leniwą ewaluację Spark: transformacje są wykonywane dopiero wtedy, gdy akcja (np. .show(), .write() lub .collect()) uruchomi plan obliczeniowy.
Konfiguracja Środowiska PySpark 4.0
Przed budową jakiegokolwiek potoku sesja Spark wymaga odpowiedniej konfiguracji. Spark 4.0 domyślnie włącza tryb ANSI, który wymusza bardziej restrykcyjną semantykę SQL — przepełnienia numeryczne generują teraz wyjątki zamiast cichego zawijania wartości.
# spark_setup.py
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("ETLPipeline")
.config("spark.sql.adaptive.enabled", "true") # AQE for runtime optimization
.config("spark.sql.shuffle.partitions", "200") # Tune based on cluster size
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
)
# Verify Spark version
print(spark.version) # 4.0.0Adaptive Query Execution (AQE), włączone powyżej, dynamicznie optymalizuje partycje shuffle i strategie joinów w czasie wykonywania. Spark 4.0 zapewnia przyspieszenie rzędu 20-50% na typowych obciążeniach ETL w porównaniu ze Spark 3.x, głównie dzięki ulepszeniom AQE i optymalizatora Catalyst.
Odczyt Surowych Danych za Pomocą DataFrame API
DataFrame w PySpark abstrahują rozproszone obliczenia za znajomym tabelarycznym interfejsem. Odczyt z różnych źródeł — CSV, Parquet, JSON, baz danych — odbywa się według spójnego wzorca.
# read_sources.py
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# Define schema explicitly — avoids slow schema inference on large files
order_schema = StructType([
StructField("order_id", StringType(), nullable=False),
StructField("customer_id", StringType(), nullable=False),
StructField("product_code", StringType(), nullable=True),
StructField("amount", DoubleType(), nullable=True),
StructField("order_date", TimestampType(), nullable=True),
StructField("region", StringType(), nullable=True),
])
# Read CSV with explicit schema
raw_orders = (
spark.read
.schema(order_schema) # Skip inference, enforce types
.option("header", "true") # First row contains column names
.option("mode", "DROPMALFORMED") # Skip rows that don't match schema
.csv("s3a://data-lake/raw/orders/")
)
# Read Parquet — schema is embedded in the file format
products = spark.read.parquet("s3a://data-lake/raw/products/")
raw_orders.printSchema()
raw_orders.show(5, truncate=False)Jawne definiowanie schematu to najlepsza praktyka produkcyjna. Inferencja schematu wymaga pełnego skanu źródła danych, co jest kosztowne na dużych zbiorach i może prowadzić do błędnego rozpoznania typów (np. interpretacji kodów pocztowych jako liczb całkowitych).
Czyszczenie i Transformacja DataFrame
Surowe dane zawsze wymagają czyszczenia. Transformacje PySpark naturalnie łączą się w łańcuchy za pomocą DataFrame API, a ponieważ DataFrame są niemutowalne, każdy krok tworzy nowy DataFrame bez modyfikacji oryginału.
# clean_orders.py
from pyspark.sql import functions as F
cleaned_orders = (
raw_orders
.filter(F.col("order_id").isNotNull()) # Remove null primary keys
.filter(F.col("amount") > 0) # Filter invalid amounts
.withColumn("customer_id", F.trim(F.upper(F.col("customer_id")))) # Normalize IDs
.withColumn("order_date", F.to_date(F.col("order_date"))) # Cast to date type
.withColumn("year_month", # Extract partition key
F.date_format(F.col("order_date"), "yyyy-MM"))
.dropDuplicates(["order_id"]) # Deduplicate by order ID
)
print(f"Raw: {raw_orders.count()} rows -> Cleaned: {cleaned_orders.count()} rows")Kolejność transformacji ma znaczenie. Normalizacja customer_id przed deduplikacją zapewnia spójne dopasowania. Ekstrakcja year_month po parsowaniu daty pozwala uniknąć wartości null w kolumnie partycji.
Wywołanie .collect() pobiera cały rozproszony DataFrame do pamięci sterownika. Na zbiorach danych przekraczających dostępny RAM sterownika powoduje to OutOfMemoryError. Zamiast tego należy używać .show(), .take() lub .toPandas() na wstępnie przefiltrowanych podzbiorach.
Łączenie i Agregacja Danych z Wielu Źródeł
Rzeczywiste potoki łączą dane z wielu źródeł. PySpark obsługuje wszystkie standardowe typy joinów, a AQE w Spark 4.0 automatycznie wybiera broadcast join dla małych tabel bez ręcznych podpowiedzi.
# enrich_orders.py
# Join orders with product catalog
enriched = (
cleaned_orders
.join(products, on="product_code", how="left") # Keep all orders, even unmatched
.select(
"order_id",
"customer_id",
"product_code",
F.col("name").alias("product_name"), # Rename for clarity
"amount",
F.col("category"), # From products table
"order_date",
"year_month",
)
)
# Aggregate: monthly revenue per product category
monthly_revenue = (
enriched
.groupBy("year_month", "category")
.agg(
F.sum("amount").alias("total_revenue"), # Sum all order amounts
F.countDistinct("order_id").alias("order_count"), # Unique orders
F.avg("amount").alias("avg_order_value"), # Average basket size
)
.orderBy(F.desc("total_revenue"))
)
monthly_revenue.show(10)Join typu left zachowuje wszystkie zamówienia, nawet gdy kod produktu nie ma odpowiednika w katalogu — częsty scenariusz podczas migracji danych lub gdy katalogi produktów nie nadążają za systemami transakcyjnymi.
Gotowy na rozmowy o Data Engineering?
Ćwicz z naszymi interaktywnymi symulatorami, flashcards i testami technicznymi.
Zapis Zoptymalizowanych Wyników z Partycjonowaniem
Ostatni krok w potoku ETL to zapis przekształconych danych do docelowej warstwy przechowywania. Partycjonowanie wyników według często odpytywanych kolumn drastycznie skraca czas odczytu dla konsumentów danych.
# write_output.py
# Write enriched data partitioned by year_month
(
enriched
.repartition("year_month") # Align partitions with output
.write
.mode("overwrite") # Replace existing partition data
.partitionBy("year_month") # Physical directory partitioning
.parquet("s3a://data-lake/curated/enriched_orders/")
)
# Write aggregated metrics as a single compact file
(
monthly_revenue
.coalesce(1) # Single output file for small results
.write
.mode("overwrite")
.parquet("s3a://data-lake/curated/monthly_revenue/")
)
print("Pipeline complete — output written to curated layer")Użycie .repartition() przed .partitionBy() zapewnia, że każda fizyczna partycja zawiera rozsądną liczbę plików. Bez tego Spark może wygenerować setki małych plików na partycję — powszechny anty-wzorzec znany jako "problem małych plików", który obniża wydajność odczytu na HDFS i obiektowych magazynach danych.
Spark 4.0: Python Data Source API dla Niestandardowych Konektorów
Spark 4.0 wprowadza Python Data Source API, eliminując dotychczasowy wymóg pisania niestandardowych konektorów w Javie lub Scali. Upraszcza to integrację z zastrzeżonymi systemami, interfejsami REST API i nietypowymi formatami plików.
# custom_source.py
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
class APIDataSource(DataSource):
"""Custom data source reading from an internal REST API."""
@classmethod
def name(cls) -> str:
return "internal_api" # Registered source name
def schema(self) -> StructType:
return StructType([ # Define output schema
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("status", StringType()),
])
def reader(self, schema: StructType) -> "APIDataSourceReader":
return APIDataSourceReader(self.options) # Pass options to reader
class APIDataSourceReader(DataSourceReader):
def __init__(self, options):
self.endpoint = options.get("endpoint", "") # API endpoint URL
def read(self, partition):
import requests # Import inside read for serialization
response = requests.get(self.endpoint)
for record in response.json():
yield (record["id"], record["name"], record["status"])
# Register and use the custom source
spark.dataSource.register(APIDataSource)
api_df = spark.read.format("internal_api").option("endpoint", "https://api.internal/users").load()
api_df.show()Python Data Source API obsługuje zarówno odczyt wsadowy, jak i strumieniowy. W zastosowaniach produkcyjnych dodanie obsługi błędów wewnątrz read() oraz implementacja równoległości na poziomie partycji znacząco poprawia przepustowość.
Spark Connect oddziela aplikacje klienckie od klastra Spark. Lekki klient PySpark (pyspark-client) waży zaledwie 1,5 MB w porównaniu z pełnym pakietem PySpark o wadze 355 MB. Umożliwia to uruchamianie zadań Spark z dowolnego IDE lub notatnika bez lokalnej instalacji Spark.
Lista Kontrolna Optymalizacji Wydajności Potoków PySpark
Zbudowanie działającego potoku to tylko część wyzwania. Osiągnięcie wydajności na dużą skalę wymaga uwagi na partycjonowanie, buforowanie i strategie joinów.
| Technika | Kiedy Stosować | Wpływ |
|----------|---------------|-------|
| Jawne schematy | Każda operacja odczytu | Eliminuje skan inferencji schematu |
| Przycinanie partycji | Filtrowanie partycjonowanych zbiorów | Pomija nieistotne partycje danych |
| Broadcast join | Mała tabela (< 10 MB) łączona z dużą | Unika kosztownego shuffle |
| Buforowanie | DataFrame używany w wielu akcjach | Zapobiega ponownemu obliczaniu |
| Coalesce vs Repartition | Zmniejszanie liczby partycji | coalesce unika pełnego shuffle |
| AQE | Zawsze (domyślnie w Spark 4.0) | Optymalizacja joinów i shuffle w runtime |
Monitorowanie wydajności potoku przez Spark UI pozostaje kluczowe. Wizualizacja DAG ujawnia granice shuffle, a zakładka SQL pokazuje fizyczny plan wybrany przez optymalizator Catalyst.
Podsumowanie
- PySpark 4.0 dostarcza dojrzałe, pythonowe API do budowy potoków ETL dowolnej skali, z domyślną zgodnością z ANSI SQL
- Jawne definiowanie schematów, prawidłowe partycjonowanie i AQE eliminują najczęstsze problemy wydajnościowe w potokach produkcyjnych
- Nowe Python Data Source API usuwa barierę Java/Scala dla niestandardowych konektorów — interfejsy REST API, zastrzeżone formaty i źródła strumieniowe można integrować w czystym Pythonie
- Kolejność transformacji ma znaczenie: normalizacja identyfikatorów przed deduplikacją, parsowanie dat przed ekstrakcją kluczy partycji
- Pytania rekrutacyjne z inżynierii danych często obejmują wewnętrzne mechanizmy Spark, takie jak leniwa ewaluacja, mechanika shuffle i strategie partycjonowania — zrozumienie podstaw potoków bezpośrednio przygotowuje do tych rozmów
- Warto zapoznać się z wzorcami ETL i ELT dla głębszego zrozumienia decyzji architektonicznych dotyczących potoków
Zacznij ćwiczyć!
Sprawdź swoją wiedzę z naszymi symulatorami rozmów i testami technicznymi.
Tagi
Udostępnij
Powiązane artykuły

ETL vs ELT w 2026: Architektura potoków danych od podstaw
Porównanie ETL i ELT dla nowoczesnych potoków danych. Różnice architektoniczne, kompromisy wydajnościowe i zastosowania z Snowflake, BigQuery i dbt.

25 najczęściej zadawanych pytań na rozmowie z Data Engineeringu w 2026
Kompletny przewodnik po 25 najczęściej zadawanych pytaniach na rozmowach kwalifikacyjnych z data engineeringu w 2026 roku. Obejmuje SQL, ETL/ELT, Spark, Kafka, modelowanie danych, orkiestrację pipeline'ów i projektowanie systemów z szczegółowymi odpowiedziami i przykładami kodu.