Apache Spark con Python: construir pipelines de datos paso a paso

Tutorial práctico de PySpark que cubre operaciones con DataFrame, construcción de pipelines ETL y las novedades de Spark 4.0. Incluye ejemplos de código listos para producción orientados a data engineers que preparan entrevistas técnicas.

Ilustración del tutorial de pipeline de datos Apache Spark con Python mostrando el flujo y las etapas de procesamiento

Apache Spark sigue siendo el motor de procesamiento distribuido dominante para pipelines de datos a gran escala en 2026. Combinado con PySpark, ofrece una API nativa de Python que gestiona cargas batch y streaming en clusters de cualquier tamaño sin sacrificar el rendimiento.

Este tutorial recorre la construcción de un pipeline ETL completo con PySpark, desde la ingesta en crudo hasta la salida limpia, usando funcionalidades de Spark 4.0 como la API Python Data Source y el plotting nativo sobre DataFrames.

Referencia rápida

Los DataFrames de PySpark son inmutables. Cada transformación devuelve un nuevo DataFrame sin alterar el original. Este diseño habilita la evaluación perezosa de Spark: las transformaciones solo se ejecutan cuando una acción (.show(), .write() o .collect()) dispara el plan de cómputo.

Configurar un entorno PySpark 4.0

Antes de construir cualquier pipeline, la sesión de Spark requiere una configuración adecuada. Spark 4.0 activa el modo ANSI por defecto, lo que impone una semántica SQL más estricta: los desbordamientos numéricos ahora lanzan excepciones en lugar de truncarse silenciosamente.

python
# spark_setup.py
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("ETLPipeline")
    .config("spark.sql.adaptive.enabled", "true")       # AQE for runtime optimization
    .config("spark.sql.shuffle.partitions", "200")       # Tune based on cluster size
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)

# Verify Spark version
print(spark.version)  # 4.0.0

La Adaptive Query Execution (AQE), habilitada aquí, optimiza dinámicamente las particiones de shuffle y las estrategias de join en tiempo de ejecución. Spark 4.0 aporta mejoras del 20 al 50 % en cargas ETL típicas frente a Spark 3.x, en gran parte gracias a las mejoras de AQE y al optimizador Catalyst.

Leer datos en crudo con la API DataFrame

Los DataFrames de PySpark abstraen el cómputo distribuido detrás de una API tabular familiar. La lectura desde distintas fuentes —CSV, Parquet, JSON, bases de datos— sigue un patrón consistente.

python
# read_sources.py
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Define schema explicitly — avoids slow schema inference on large files
order_schema = StructType([
    StructField("order_id", StringType(), nullable=False),
    StructField("customer_id", StringType(), nullable=False),
    StructField("product_code", StringType(), nullable=True),
    StructField("amount", DoubleType(), nullable=True),
    StructField("order_date", TimestampType(), nullable=True),
    StructField("region", StringType(), nullable=True),
])

# Read CSV with explicit schema
raw_orders = (
    spark.read
    .schema(order_schema)                    # Skip inference, enforce types
    .option("header", "true")                # First row contains column names
    .option("mode", "DROPMALFORMED")         # Skip rows that don't match schema
    .csv("s3a://data-lake/raw/orders/")
)

# Read Parquet — schema is embedded in the file format
products = spark.read.parquet("s3a://data-lake/raw/products/")

raw_orders.printSchema()
raw_orders.show(5, truncate=False)

Definir el esquema de forma explícita es una buena práctica en producción. La inferencia de esquema obliga a escanear por completo la fuente, algo costoso en datasets grandes, y puede equivocarse en los tipos (interpretar códigos postales como enteros, por ejemplo).

Limpiar y transformar DataFrames

Los datos en crudo siempre necesitan limpieza. Las transformaciones de PySpark se encadenan de forma natural con la API DataFrame y, al ser inmutables, cada paso produce un nuevo DataFrame sin modificar el original.

python
# clean_orders.py
from pyspark.sql import functions as F

cleaned_orders = (
    raw_orders
    .filter(F.col("order_id").isNotNull())                     # Remove null primary keys
    .filter(F.col("amount") > 0)                               # Filter invalid amounts
    .withColumn("customer_id", F.trim(F.upper(F.col("customer_id"))))  # Normalize IDs
    .withColumn("order_date", F.to_date(F.col("order_date")))  # Cast to date type
    .withColumn("year_month",                                   # Extract partition key
        F.date_format(F.col("order_date"), "yyyy-MM"))
    .dropDuplicates(["order_id"])                               # Deduplicate by order ID
)

print(f"Raw: {raw_orders.count()} rows -> Cleaned: {cleaned_orders.count()} rows")

El orden importa en las cadenas de transformación. Normalizar customer_id antes de deduplicar garantiza un matching coherente. Extraer year_month después de parsear la fecha evita valores nulos en la columna de partición.

Evitar collect() en DataFrames grandes

Llamar a .collect() trae el DataFrame distribuido completo a la memoria del driver. En datasets que exceden la RAM del driver, esto provoca un OutOfMemoryError. Conviene usar .show(), .take() o .toPandas() sobre subconjuntos previamente filtrados.

Unir y agregar múltiples fuentes

Los pipelines reales combinan datos de varias fuentes. PySpark soporta todos los tipos de join estándar, y la AQE de Spark 4.0 selecciona automáticamente broadcast joins para tablas pequeñas sin hints manuales.

python
# enrich_orders.py
# Join orders with product catalog
enriched = (
    cleaned_orders
    .join(products, on="product_code", how="left")      # Keep all orders, even unmatched
    .select(
        "order_id",
        "customer_id",
        "product_code",
        F.col("name").alias("product_name"),              # Rename for clarity
        "amount",
        F.col("category"),                                # From products table
        "order_date",
        "year_month",
    )
)

# Aggregate: monthly revenue per product category
monthly_revenue = (
    enriched
    .groupBy("year_month", "category")
    .agg(
        F.sum("amount").alias("total_revenue"),            # Sum all order amounts
        F.countDistinct("order_id").alias("order_count"),  # Unique orders
        F.avg("amount").alias("avg_order_value"),          # Average basket size
    )
    .orderBy(F.desc("total_revenue"))
)

monthly_revenue.show(10)

El join left preserva todas las órdenes incluso cuando un código de producto no coincide en el catálogo, un escenario habitual durante migraciones de datos o cuando el catálogo va por detrás de los sistemas transaccionales.

¿Listo para aprobar tus entrevistas de Data Engineering?

Practica con nuestros simuladores interactivos, flashcards y tests técnicos.

Escribir salida optimizada con particionamiento

El paso final de un pipeline ETL escribe los datos transformados en una capa de almacenamiento destino. Particionar la salida por las columnas más consultadas reduce drásticamente los tiempos de lectura para los consumidores downstream.

python
# write_output.py
# Write enriched data partitioned by year_month
(
    enriched
    .repartition("year_month")                             # Align partitions with output
    .write
    .mode("overwrite")                                     # Replace existing partition data
    .partitionBy("year_month")                             # Physical directory partitioning
    .parquet("s3a://data-lake/curated/enriched_orders/")
)

# Write aggregated metrics as a single compact file
(
    monthly_revenue
    .coalesce(1)                                           # Single output file for small results
    .write
    .mode("overwrite")
    .parquet("s3a://data-lake/curated/monthly_revenue/")
)

print("Pipeline complete — output written to curated layer")

Usar .repartition() antes de .partitionBy() asegura que cada partición física contenga un número razonable de archivos. Sin ello, Spark puede producir cientos de archivos diminutos por partición, un anti-patrón conocido como el «small files problem» que degrada las lecturas en HDFS y object stores.

Spark 4.0: la API Python Data Source para conectores a medida

Spark 4.0 introduce la API Python Data Source, eliminando la obligación previa de escribir conectores a medida en Java o Scala. Esto simplifica la integración con sistemas propietarios, APIs REST o formatos de archivo poco comunes.

python
# custom_source.py
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

class APIDataSource(DataSource):
    """Custom data source reading from an internal REST API."""

    @classmethod
    def name(cls) -> str:
        return "internal_api"                              # Registered source name

    def schema(self) -> StructType:
        return StructType([                                # Define output schema
            StructField("id", IntegerType()),
            StructField("name", StringType()),
            StructField("status", StringType()),
        ])

    def reader(self, schema: StructType) -> "APIDataSourceReader":
        return APIDataSourceReader(self.options)           # Pass options to reader

class APIDataSourceReader(DataSourceReader):
    def __init__(self, options):
        self.endpoint = options.get("endpoint", "")        # API endpoint URL

    def read(self, partition):
        import requests                                    # Import inside read for serialization
        response = requests.get(self.endpoint)
        for record in response.json():
            yield (record["id"], record["name"], record["status"])

# Register and use the custom source
spark.dataSource.register(APIDataSource)
api_df = spark.read.format("internal_api").option("endpoint", "https://api.internal/users").load()
api_df.show()

La API Python Data Source admite lecturas batch y streaming. Para producción, añadir manejo de errores dentro de read() e implementar paralelismo a nivel de partición mejora notablemente el throughput.

Spark Connect en 4.0

Spark Connect desacopla las aplicaciones cliente del cluster de Spark. El cliente ligero de PySpark (pyspark-client) pesa apenas 1,5 MB frente a los 355 MB del paquete PySpark completo. Esto permite ejecutar trabajos Spark desde cualquier IDE o notebook sin instalación local de Spark.

Checklist de tuning para pipelines PySpark

Construir un pipeline que funcione es solo una parte del desafío. Hacerlo rendir a escala exige atención al particionamiento, el caching y las estrategias de join.

| Técnica | Cuándo aplicar | Impacto | |-----------|---------------|--------| | Esquemas explícitos | En cada operación de lectura | Elimina el escaneo de inferencia de esquema | | Partition pruning | Filtrado sobre datasets particionados | Omite particiones irrelevantes por completo | | Broadcast joins | Tabla pequeña (< 10 MB) unida a una grande | Evita un shuffle costoso | | Caching | DataFrame reutilizado en varias acciones | Previene el recómputo | | Coalesce vs Repartition | Reducir el número de particiones | coalesce evita un shuffle completo | | AQE | Siempre (por defecto en Spark 4.0) | Optimización en tiempo de ejecución de joins y shuffles |

Monitorizar el rendimiento del pipeline a través de la UI de Spark sigue siendo clave. La visualización del DAG revela las fronteras de shuffle y la pestaña SQL muestra el plan físico elegido por el optimizador Catalyst.

Conclusión

  • PySpark 4.0 ofrece una API Python madura para construir pipelines ETL a cualquier escala, con cumplimiento de SQL ANSI activado por defecto
  • La definición explícita de esquemas, un particionamiento adecuado y AQE eliminan los cuellos de botella más habituales en producción
  • La nueva API Python Data Source derriba la barrera de Java/Scala para conectores a medida: APIs REST, formatos propietarios y fuentes de streaming pueden integrarse en Python puro
  • El orden de las transformaciones importa: normalizar identificadores antes de deduplicar, parsear fechas antes de extraer claves de partición
  • Las preguntas de entrevista de Data Engineering abordan con frecuencia los internals de Spark como evaluación perezosa, mecánica de shuffle y estrategias de particionamiento: dominar estos fundamentos prepara directamente para esas conversaciones
  • Conviene explorar los patrones ETL y ELT para una cobertura más profunda de las decisiones de arquitectura de pipelines

¡Empieza a practicar!

Pon a prueba tu conocimiento con nuestros simuladores de entrevista y tests técnicos.

Etiquetas

#apache-spark
#pyspark
#data-pipeline
#etl
#python
#spark-4
#data-engineering

Compartir

Artículos relacionados