Apache Spark con Python: costruire pipeline dati passo dopo passo con PySpark 4.0

Guida pratica alla costruzione di pipeline dati ETL con PySpark e Spark 4.0: dalla lettura dei dati grezzi alla scrittura ottimizzata, passando per trasformazioni, join, aggregazioni e la nuova Python Data Source API.

Tutorial Apache Spark con Python per pipeline dati – illustrazione del flusso dati e fasi di elaborazione

Apache Spark si conferma nel 2026 il motore di elaborazione distribuita dominante per le pipeline dati su larga scala. Abbinato a PySpark, offre un'API nativa Python capace di gestire workload batch e streaming su cluster di qualsiasi dimensione, senza compromettere le prestazioni.

Questo tutorial guida alla costruzione di una pipeline ETL completa con PySpark, dall'acquisizione dei dati grezzi fino all'output pulito e ottimizzato, sfruttando le novita di Spark 4.0 come la Python Data Source API e il plotting nativo dei DataFrame.

Il percorso copre ogni fase fondamentale: configurazione dell'ambiente, lettura dei dati, pulizia, trasformazioni, join tra sorgenti multiple, aggregazioni e scrittura partizionata. Ogni passaggio include codice eseguibile e spiegazioni delle scelte architetturali che distinguono una pipeline di produzione da un semplice prototipo.

Riferimento rapido

I DataFrame PySpark sono immutabili. Ogni trasformazione restituisce un nuovo DataFrame, lasciando inalterato l'originale. Questo design consente la lazy evaluation di Spark: le trasformazioni vengono eseguite solo quando un'azione (come .show(), .write() o .collect()) attiva il piano di computazione.

Configurazione dell'ambiente PySpark 4.0

Prima di costruire qualsiasi pipeline, la sessione Spark necessita di una configurazione adeguata. Spark 4.0 abilita la modalita ANSI per impostazione predefinita, il che impone una semantica SQL piu rigorosa: gli overflow numerici ora generano eccezioni invece di effettuare silenziosamente un wrap-around.

python
# spark_setup.py
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("ETLPipeline")
    .config("spark.sql.adaptive.enabled", "true")       # AQE for runtime optimization
    .config("spark.sql.shuffle.partitions", "200")       # Tune based on cluster size
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)

# Verify Spark version
print(spark.version)  # 4.0.0

L'Adaptive Query Execution (AQE), abilitata qui, ottimizza dinamicamente le partizioni di shuffle e le strategie di join a runtime. Spark 4.0 introduce miglioramenti del 20-50% sulle prestazioni dei tipici workload ETL rispetto a Spark 3.x, in gran parte grazie ai perfezionamenti di AQE e dell'ottimizzatore Catalyst.

Lettura dei dati grezzi con l'API DataFrame

I DataFrame PySpark astraggono la computazione distribuita dietro un'API tabulare familiare. La lettura da diverse sorgenti -- CSV, Parquet, JSON, database -- segue un pattern coerente.

python
# read_sources.py
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Define schema explicitly — avoids slow schema inference on large files
order_schema = StructType([
    StructField("order_id", StringType(), nullable=False),
    StructField("customer_id", StringType(), nullable=False),
    StructField("product_code", StringType(), nullable=True),
    StructField("amount", DoubleType(), nullable=True),
    StructField("order_date", TimestampType(), nullable=True),
    StructField("region", StringType(), nullable=True),
])

# Read CSV with explicit schema
raw_orders = (
    spark.read
    .schema(order_schema)                    # Skip inference, enforce types
    .option("header", "true")                # First row contains column names
    .option("mode", "DROPMALFORMED")         # Skip rows that don't match schema
    .csv("s3a://data-lake/raw/orders/")
)

# Read Parquet — schema is embedded in the file format
products = spark.read.parquet("s3a://data-lake/raw/products/")

raw_orders.printSchema()
raw_orders.show(5, truncate=False)

La definizione esplicita dello schema rappresenta una best practice in produzione. L'inferenza dello schema richiede una scansione completa della sorgente dati, operazione costosa su dataset di grandi dimensioni e che puo indovinare i tipi in modo errato -- ad esempio interpretando i codici postali come interi.

Pulizia e trasformazione dei DataFrame

I dati grezzi necessitano sempre di pulizia. Le trasformazioni PySpark si concatenano in modo naturale attraverso l'API DataFrame, e poiche i DataFrame sono immutabili, ogni passaggio produce un nuovo DataFrame senza modificare l'originale.

python
# clean_orders.py
from pyspark.sql import functions as F

cleaned_orders = (
    raw_orders
    .filter(F.col("order_id").isNotNull())                     # Remove null primary keys
    .filter(F.col("amount") > 0)                               # Filter invalid amounts
    .withColumn("customer_id", F.trim(F.upper(F.col("customer_id"))))  # Normalize IDs
    .withColumn("order_date", F.to_date(F.col("order_date")))  # Cast to date type
    .withColumn("year_month",                                   # Extract partition key
        F.date_format(F.col("order_date"), "yyyy-MM"))
    .dropDuplicates(["order_id"])                               # Deduplicate by order ID
)

print(f"Raw: {raw_orders.count()} rows -> Cleaned: {cleaned_orders.count()} rows")

L'ordine delle operazioni conta nelle catene di trasformazione. Normalizzare customer_id prima della deduplicazione garantisce un matching coerente. Estrarre year_month dopo il parsing della data evita valori null nella colonna di partizione.

Attenzione a collect() sui DataFrame di grandi dimensioni

Chiamare .collect() trasferisce l'intero DataFrame distribuito nella memoria del driver. Su dataset che superano la RAM del driver, questo causa un OutOfMemoryError. Utilizzare .show(), .take() oppure .toPandas() su sottoinsiemi pre-filtrati.

Join e aggregazioni tra sorgenti multiple

Le pipeline reali combinano dati provenienti da sorgenti diverse. PySpark supporta tutti i tipi di join standard, e l'AQE di Spark 4.0 seleziona automaticamente i broadcast join per le tabelle piccole senza bisogno di hint manuali.

python
# enrich_orders.py
# Join orders with product catalog
enriched = (
    cleaned_orders
    .join(products, on="product_code", how="left")      # Keep all orders, even unmatched
    .select(
        "order_id",
        "customer_id",
        "product_code",
        F.col("name").alias("product_name"),              # Rename for clarity
        "amount",
        F.col("category"),                                # From products table
        "order_date",
        "year_month",
    )
)

# Aggregate: monthly revenue per product category
monthly_revenue = (
    enriched
    .groupBy("year_month", "category")
    .agg(
        F.sum("amount").alias("total_revenue"),            # Sum all order amounts
        F.countDistinct("order_id").alias("order_count"),  # Unique orders
        F.avg("amount").alias("avg_order_value"),          # Average basket size
    )
    .orderBy(F.desc("total_revenue"))
)

monthly_revenue.show(10)

Il join di tipo left preserva tutti gli ordini anche quando un codice prodotto non ha corrispondenza nel catalogo -- scenario comune durante le migrazioni dati o quando i cataloghi prodotto non sono allineati con i sistemi transazionali.

Pronto a superare i tuoi colloqui su Data Engineering?

Pratica con i nostri simulatori interattivi, flashcards e test tecnici.

Scrittura ottimizzata dell'output con partizionamento

L'ultimo passaggio di una pipeline ETL scrive i dati trasformati su un livello di storage di destinazione. Partizionare l'output per le colonne interrogate piu frequentemente riduce drasticamente i tempi di lettura per i consumatori a valle.

python
# write_output.py
# Write enriched data partitioned by year_month
(
    enriched
    .repartition("year_month")                             # Align partitions with output
    .write
    .mode("overwrite")                                     # Replace existing partition data
    .partitionBy("year_month")                             # Physical directory partitioning
    .parquet("s3a://data-lake/curated/enriched_orders/")
)

# Write aggregated metrics as a single compact file
(
    monthly_revenue
    .coalesce(1)                                           # Single output file for small results
    .write
    .mode("overwrite")
    .parquet("s3a://data-lake/curated/monthly_revenue/")
)

print("Pipeline complete — output written to curated layer")

Utilizzare .repartition() prima di .partitionBy() assicura che ogni partizione fisica contenga un numero ragionevole di file. Senza questo accorgimento, Spark potrebbe produrre centinaia di file minuscoli per partizione -- un anti-pattern noto come "small files problem" che degrada le prestazioni di lettura su HDFS e object store.

Spark 4.0: Python Data Source API per connettori personalizzati

Spark 4.0 introduce la Python Data Source API, eliminando il precedente requisito di scrivere connettori personalizzati in Java o Scala. Questo semplifica notevolmente l'integrazione con sistemi proprietari, API REST o formati di file di nicchia.

python
# custom_source.py
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

class APIDataSource(DataSource):
    """Custom data source reading from an internal REST API."""

    @classmethod
    def name(cls) -> str:
        return "internal_api"                              # Registered source name

    def schema(self) -> StructType:
        return StructType([                                # Define output schema
            StructField("id", IntegerType()),
            StructField("name", StringType()),
            StructField("status", StringType()),
        ])

    def reader(self, schema: StructType) -> "APIDataSourceReader":
        return APIDataSourceReader(self.options)           # Pass options to reader

class APIDataSourceReader(DataSourceReader):
    def __init__(self, options):
        self.endpoint = options.get("endpoint", "")        # API endpoint URL

    def read(self, partition):
        import requests                                    # Import inside read for serialization
        response = requests.get(self.endpoint)
        for record in response.json():
            yield (record["id"], record["name"], record["status"])

# Register and use the custom source
spark.dataSource.register(APIDataSource)
api_df = spark.read.format("internal_api").option("endpoint", "https://api.internal/users").load()
api_df.show()

La Python Data Source API supporta sia letture batch che streaming. Per l'utilizzo in produzione, aggiungere la gestione degli errori all'interno di read() e implementare il parallelismo a livello di partizione migliora significativamente il throughput.

Spark Connect in 4.0

Spark Connect disaccoppia le applicazioni client dal cluster Spark. Il client PySpark leggero (pyspark-client) pesa solo 1,5 MB rispetto ai 355 MB del pacchetto PySpark completo. Questo consente di eseguire job Spark da qualsiasi IDE o notebook senza un'installazione locale di Spark.

Checklist di ottimizzazione delle prestazioni per pipeline PySpark

Costruire una pipeline funzionante rappresenta solo una parte della sfida. Renderla performante su larga scala richiede attenzione al partizionamento, al caching e alle strategie di join.

| Tecnica | Quando applicarla | Impatto | |---------|-------------------|--------| | Schema espliciti | Ogni operazione di lettura | Elimina la scansione per l'inferenza dello schema | | Partition pruning | Filtraggio su dataset partizionati | Salta interamente le partizioni irrilevanti | | Broadcast join | Tabella piccola (< 10 MB) unita a tabella grande | Evita shuffle costosi | | Caching | DataFrame riutilizzato in piu azioni | Previene la ricalcolazione | | Coalesce vs Repartition | Riduzione del numero di partizioni | coalesce evita un full shuffle | | AQE | Sempre (default in Spark 4.0) | Ottimizzazione a runtime di join e shuffle |

Il monitoraggio delle prestazioni della pipeline attraverso la Spark UI resta essenziale. La visualizzazione del DAG rivela i confini degli shuffle, e la scheda SQL mostra il piano fisico scelto dall'ottimizzatore Catalyst.

Conclusione

  • PySpark 4.0 offre un'API matura e Python-first per la costruzione di pipeline ETL a qualsiasi scala, con la conformita ANSI SQL abilitata per impostazione predefinita
  • La definizione esplicita dello schema, il partizionamento corretto e l'AQE eliminano i colli di bottiglia piu comuni nelle pipeline di produzione
  • La nuova Python Data Source API rimuove la barriera Java/Scala per i connettori personalizzati: API REST, formati proprietari e sorgenti streaming possono essere integrati interamente in Python
  • L'ordine delle trasformazioni conta: normalizzare gli identificativi prima della deduplicazione, analizzare le date prima di estrarre le chiavi di partizione
  • Le domande sui colloqui di Data Engineering coprono frequentemente gli aspetti interni di Spark come la lazy evaluation, la meccanica degli shuffle e le strategie di partizionamento -- comprendere questi fondamentali delle pipeline prepara direttamente a quelle conversazioni
  • Approfondire i pattern ETL ed ELT per una copertura piu ampia delle decisioni architetturali delle pipeline

Inizia a praticare!

Metti alla prova le tue conoscenze con i nostri simulatori di colloquio e test tecnici.

Tag

#apache-spark
#pyspark
#data-pipeline
#etl
#python
#spark-4
#data-engineering

Condividi

Articoli correlati