Apache Spark avec Python : construire des pipelines de données étape par étape

Tutoriel pratique PySpark couvrant les opérations DataFrame, la construction de pipelines ETL et les fonctionnalités de Spark 4.0. Inclut des exemples de code prêts pour la production destinés aux data engineers qui préparent leurs entretiens techniques.

Illustration d'un tutoriel de pipeline de données Apache Spark avec Python montrant le flux et les étapes de traitement

Apache Spark reste le moteur de traitement distribué dominant pour les pipelines de données à grande échelle en 2026. Associé à PySpark, il offre une API Python native qui gère les charges batch et streaming sur des clusters de toute taille sans sacrifier les performances.

Ce tutoriel détaille la construction d'un pipeline ETL complet avec PySpark, de l'ingestion brute à la sortie nettoyée, en s'appuyant sur les fonctionnalités de Spark 4.0 comme l'API Python Data Source et le tracé natif des DataFrames.

Référence rapide

Les DataFrames PySpark sont immuables. Chaque transformation retourne un nouveau DataFrame sans modifier l'original. Ce design active l'évaluation paresseuse de Spark : les transformations ne s'exécutent que lorsqu'une action (.show(), .write() ou .collect()) déclenche le plan de calcul.

Configurer un environnement PySpark 4.0

Avant de construire le moindre pipeline, la session Spark nécessite une configuration soignée. Spark 4.0 active le mode ANSI par défaut, ce qui impose une sémantique SQL plus stricte — les dépassements numériques lèvent désormais des exceptions au lieu d'être silencieusement tronqués.

python
# spark_setup.py
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("ETLPipeline")
    .config("spark.sql.adaptive.enabled", "true")       # AQE for runtime optimization
    .config("spark.sql.shuffle.partitions", "200")       # Tune based on cluster size
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)

# Verify Spark version
print(spark.version)  # 4.0.0

L'Adaptive Query Execution (AQE), activée ici, optimise dynamiquement les partitions de shuffle et les stratégies de jointure à l'exécution. Spark 4.0 apporte des gains de 20 à 50 % sur les charges ETL classiques comparé à Spark 3.x, essentiellement grâce aux améliorations d'AQE et à l'optimiseur Catalyst.

Lire des données brutes avec l'API DataFrame

Les DataFrames PySpark masquent le calcul distribué derrière une API tabulaire familière. La lecture depuis différentes sources — CSV, Parquet, JSON, bases de données — suit un schéma cohérent.

python
# read_sources.py
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Define schema explicitly — avoids slow schema inference on large files
order_schema = StructType([
    StructField("order_id", StringType(), nullable=False),
    StructField("customer_id", StringType(), nullable=False),
    StructField("product_code", StringType(), nullable=True),
    StructField("amount", DoubleType(), nullable=True),
    StructField("order_date", TimestampType(), nullable=True),
    StructField("region", StringType(), nullable=True),
])

# Read CSV with explicit schema
raw_orders = (
    spark.read
    .schema(order_schema)                    # Skip inference, enforce types
    .option("header", "true")                # First row contains column names
    .option("mode", "DROPMALFORMED")         # Skip rows that don't match schema
    .csv("s3a://data-lake/raw/orders/")
)

# Read Parquet — schema is embedded in the file format
products = spark.read.parquet("s3a://data-lake/raw/products/")

raw_orders.printSchema()
raw_orders.show(5, truncate=False)

Définir un schéma explicite constitue une bonne pratique en production. L'inférence de schéma impose un scan complet de la source, coûteux sur de gros volumes, et peut se tromper sur les types (interpréter des codes postaux comme des entiers, par exemple).

Nettoyer et transformer les DataFrames

Les données brutes réclament toujours du nettoyage. Les transformations PySpark s'enchaînent naturellement avec l'API DataFrame, et puisque les DataFrames sont immuables, chaque étape produit un nouveau DataFrame sans modifier l'original.

python
# clean_orders.py
from pyspark.sql import functions as F

cleaned_orders = (
    raw_orders
    .filter(F.col("order_id").isNotNull())                     # Remove null primary keys
    .filter(F.col("amount") > 0)                               # Filter invalid amounts
    .withColumn("customer_id", F.trim(F.upper(F.col("customer_id"))))  # Normalize IDs
    .withColumn("order_date", F.to_date(F.col("order_date")))  # Cast to date type
    .withColumn("year_month",                                   # Extract partition key
        F.date_format(F.col("order_date"), "yyyy-MM"))
    .dropDuplicates(["order_id"])                               # Deduplicate by order ID
)

print(f"Raw: {raw_orders.count()} rows -> Cleaned: {cleaned_orders.count()} rows")

L'ordre des opérations est crucial dans une chaîne de transformations. Normaliser customer_id avant la déduplication garantit un matching cohérent. Extraire year_month après le parsing de la date évite les valeurs nulles dans la colonne de partitionnement.

Éviter collect() sur de gros DataFrames

Appeler .collect() rapatrie l'intégralité du DataFrame distribué dans la mémoire du driver. Sur des datasets qui dépassent la RAM du driver, cela provoque une OutOfMemoryError. Préférer .show(), .take() ou .toPandas() sur des sous-ensembles préalablement filtrés.

Joindre et agréger plusieurs sources

Un vrai pipeline combine des données issues de sources multiples. PySpark supporte tous les types de jointures standards, et l'AQE de Spark 4.0 sélectionne automatiquement les broadcast joins pour les petites tables sans indication manuelle.

python
# enrich_orders.py
# Join orders with product catalog
enriched = (
    cleaned_orders
    .join(products, on="product_code", how="left")      # Keep all orders, even unmatched
    .select(
        "order_id",
        "customer_id",
        "product_code",
        F.col("name").alias("product_name"),              # Rename for clarity
        "amount",
        F.col("category"),                                # From products table
        "order_date",
        "year_month",
    )
)

# Aggregate: monthly revenue per product category
monthly_revenue = (
    enriched
    .groupBy("year_month", "category")
    .agg(
        F.sum("amount").alias("total_revenue"),            # Sum all order amounts
        F.countDistinct("order_id").alias("order_count"),  # Unique orders
        F.avg("amount").alias("avg_order_value"),          # Average basket size
    )
    .orderBy(F.desc("total_revenue"))
)

monthly_revenue.show(10)

La jointure left conserve toutes les commandes même lorsqu'un code produit n'a pas de correspondance dans le catalogue — un scénario fréquent pendant les migrations de données ou quand le catalogue produit accuse du retard sur les systèmes transactionnels.

Prêt à réussir tes entretiens Data Engineering ?

Entraîne-toi avec nos simulateurs interactifs, fiches express et tests techniques.

Écrire une sortie optimisée avec partitionnement

L'étape finale d'un pipeline ETL écrit les données transformées vers une couche de stockage cible. Partitionner la sortie selon les colonnes fréquemment interrogées réduit drastiquement les temps de lecture pour les consommateurs downstream.

python
# write_output.py
# Write enriched data partitioned by year_month
(
    enriched
    .repartition("year_month")                             # Align partitions with output
    .write
    .mode("overwrite")                                     # Replace existing partition data
    .partitionBy("year_month")                             # Physical directory partitioning
    .parquet("s3a://data-lake/curated/enriched_orders/")
)

# Write aggregated metrics as a single compact file
(
    monthly_revenue
    .coalesce(1)                                           # Single output file for small results
    .write
    .mode("overwrite")
    .parquet("s3a://data-lake/curated/monthly_revenue/")
)

print("Pipeline complete — output written to curated layer")

Utiliser .repartition() avant .partitionBy() assure que chaque partition physique contient un nombre raisonnable de fichiers. Sans cela, Spark peut produire des centaines de minuscules fichiers par partition — un anti-pattern classique appelé « small files problem » qui dégrade les performances de lecture sur HDFS et les object stores.

Spark 4.0 : l'API Python Data Source pour les connecteurs sur-mesure

Spark 4.0 introduit l'API Python Data Source, supprimant l'obligation d'écrire des connecteurs personnalisés en Java ou Scala. L'intégration avec des systèmes propriétaires, des API REST ou des formats de fichiers de niche s'en trouve largement simplifiée.

python
# custom_source.py
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

class APIDataSource(DataSource):
    """Custom data source reading from an internal REST API."""

    @classmethod
    def name(cls) -> str:
        return "internal_api"                              # Registered source name

    def schema(self) -> StructType:
        return StructType([                                # Define output schema
            StructField("id", IntegerType()),
            StructField("name", StringType()),
            StructField("status", StringType()),
        ])

    def reader(self, schema: StructType) -> "APIDataSourceReader":
        return APIDataSourceReader(self.options)           # Pass options to reader

class APIDataSourceReader(DataSourceReader):
    def __init__(self, options):
        self.endpoint = options.get("endpoint", "")        # API endpoint URL

    def read(self, partition):
        import requests                                    # Import inside read for serialization
        response = requests.get(self.endpoint)
        for record in response.json():
            yield (record["id"], record["name"], record["status"])

# Register and use the custom source
spark.dataSource.register(APIDataSource)
api_df = spark.read.format("internal_api").option("endpoint", "https://api.internal/users").load()
api_df.show()

L'API Python Data Source supporte les lectures batch et streaming. Pour un usage en production, ajouter de la gestion d'erreurs dans read() et implémenter un parallélisme au niveau des partitions améliore sensiblement le débit.

Spark Connect dans 4.0

Spark Connect découple les applications clientes du cluster Spark. Le client PySpark léger (pyspark-client) ne pèse que 1,5 Mo contre 355 Mo pour le package PySpark complet. Cela permet de lancer des jobs Spark depuis n'importe quel IDE ou notebook sans installation Spark locale.

Checklist d'optimisation des pipelines PySpark

Construire un pipeline qui fonctionne n'est qu'une partie du défi. Le rendre performant à l'échelle exige une attention particulière au partitionnement, au caching et aux stratégies de jointure.

| Technique | Quand l'appliquer | Impact | |-----------|---------------|--------| | Schémas explicites | À chaque opération de lecture | Élimine le scan d'inférence de schéma | | Partition pruning | Filtrage de datasets partitionnés | Ignore complètement les partitions non pertinentes | | Broadcast joins | Petite table (< 10 Mo) jointe à une grande table | Évite un shuffle coûteux | | Caching | DataFrame réutilisé dans plusieurs actions | Empêche le recalcul | | Coalesce vs Repartition | Réduction du nombre de partitions | coalesce évite un shuffle complet | | AQE | Toujours (activé par défaut dans Spark 4.0) | Optimisation à l'exécution des jointures et shuffles |

Le suivi des performances via l'UI Spark reste indispensable. La visualisation du DAG révèle les frontières de shuffle, et l'onglet SQL affiche le plan physique choisi par l'optimiseur Catalyst.

Conclusion

  • PySpark 4.0 offre une API Python mature pour construire des pipelines ETL à n'importe quelle échelle, avec la conformité SQL ANSI activée par défaut
  • La définition explicite des schémas, un partitionnement soigné et l'AQE éliminent les goulots d'étranglement les plus fréquents en production
  • La nouvelle API Python Data Source fait tomber la barrière Java/Scala pour les connecteurs sur-mesure — API REST, formats propriétaires et sources streaming s'intègrent désormais en pur Python
  • L'ordre des transformations compte : normaliser les identifiants avant la déduplication, parser les dates avant d'extraire les clés de partition
  • Les questions d'entretien Data Engineering abordent fréquemment les internals de Spark comme l'évaluation paresseuse, la mécanique du shuffle et les stratégies de partitionnement — maîtriser ces fondamentaux de pipeline prépare directement à ces discussions
  • Explorer les patterns ETL et ELT pour une couverture plus approfondie des décisions d'architecture de pipeline

Passe à la pratique !

Teste tes connaissances avec nos simulateurs d'entretien et tests techniques.

Tags

#apache-spark
#pyspark
#data-pipeline
#etl
#python
#spark-4
#data-engineering

Partager

Articles similaires