Apache Spark com Python: construindo pipelines de dados passo a passo

Tutorial prático de PySpark cobrindo operações com DataFrame, construção de pipelines ETL e recursos do Spark 4.0. Inclui exemplos de código prontos para produção voltados a engenheiros de dados que se preparam para entrevistas técnicas.

Ilustração do tutorial de pipeline de dados Apache Spark com Python mostrando o fluxo e as etapas de processamento

O Apache Spark continua sendo o motor de processamento distribuído dominante para pipelines de dados em larga escala em 2026. Combinado com o PySpark, oferece uma API nativa em Python que lida com cargas batch e streaming em clusters de qualquer tamanho sem comprometer a performance.

Este tutorial percorre a construção de um pipeline ETL completo com PySpark, da ingestão bruta à saída limpa, usando recursos do Spark 4.0 como a API Python Data Source e o plotting nativo sobre DataFrames.

Referência rápida

Os DataFrames do PySpark são imutáveis. Cada transformação retorna um novo DataFrame sem alterar o original. Esse desenho habilita a avaliação preguiçosa do Spark: as transformações só são executadas quando uma ação (.show(), .write() ou .collect()) dispara o plano de cômputo.

Configurando um ambiente PySpark 4.0

Antes de construir qualquer pipeline, a sessão Spark exige configuração adequada. O Spark 4.0 ativa o modo ANSI por padrão, o que impõe uma semântica SQL mais estrita — estouros numéricos agora lançam exceções em vez de serem silenciosamente truncados.

python
# spark_setup.py
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("ETLPipeline")
    .config("spark.sql.adaptive.enabled", "true")       # AQE for runtime optimization
    .config("spark.sql.shuffle.partitions", "200")       # Tune based on cluster size
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)

# Verify Spark version
print(spark.version)  # 4.0.0

O Adaptive Query Execution (AQE), habilitado aqui, otimiza dinamicamente as partições de shuffle e as estratégias de join em tempo de execução. O Spark 4.0 traz ganhos de 20 a 50% em cargas ETL típicas em relação ao Spark 3.x, em grande parte pelas melhorias do AQE e do otimizador Catalyst.

Lendo dados brutos com a API DataFrame

Os DataFrames do PySpark abstraem a computação distribuída por trás de uma API tabular familiar. A leitura de diferentes fontes — CSV, Parquet, JSON, bancos de dados — segue um padrão consistente.

python
# read_sources.py
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Define schema explicitly — avoids slow schema inference on large files
order_schema = StructType([
    StructField("order_id", StringType(), nullable=False),
    StructField("customer_id", StringType(), nullable=False),
    StructField("product_code", StringType(), nullable=True),
    StructField("amount", DoubleType(), nullable=True),
    StructField("order_date", TimestampType(), nullable=True),
    StructField("region", StringType(), nullable=True),
])

# Read CSV with explicit schema
raw_orders = (
    spark.read
    .schema(order_schema)                    # Skip inference, enforce types
    .option("header", "true")                # First row contains column names
    .option("mode", "DROPMALFORMED")         # Skip rows that don't match schema
    .csv("s3a://data-lake/raw/orders/")
)

# Read Parquet — schema is embedded in the file format
products = spark.read.parquet("s3a://data-lake/raw/products/")

raw_orders.printSchema()
raw_orders.show(5, truncate=False)

Definir o schema explicitamente é uma boa prática de produção. A inferência de schema exige uma varredura completa da fonte, o que é caro em datasets grandes e pode errar nos tipos (interpretar CEPs como inteiros, por exemplo).

Limpando e transformando DataFrames

Dados brutos sempre precisam de limpeza. As transformações do PySpark encadeiam-se naturalmente com a API DataFrame e, como os DataFrames são imutáveis, cada etapa produz um novo DataFrame sem modificar o original.

python
# clean_orders.py
from pyspark.sql import functions as F

cleaned_orders = (
    raw_orders
    .filter(F.col("order_id").isNotNull())                     # Remove null primary keys
    .filter(F.col("amount") > 0)                               # Filter invalid amounts
    .withColumn("customer_id", F.trim(F.upper(F.col("customer_id"))))  # Normalize IDs
    .withColumn("order_date", F.to_date(F.col("order_date")))  # Cast to date type
    .withColumn("year_month",                                   # Extract partition key
        F.date_format(F.col("order_date"), "yyyy-MM"))
    .dropDuplicates(["order_id"])                               # Deduplicate by order ID
)

print(f"Raw: {raw_orders.count()} rows -> Cleaned: {cleaned_orders.count()} rows")

A ordem importa em cadeias de transformação. Normalizar customer_id antes da deduplicação garante correspondência consistente. Extrair year_month após o parsing da data evita valores nulos na coluna de partição.

Evite collect() em DataFrames grandes

Chamar .collect() traz o DataFrame distribuído inteiro para a memória do driver. Em datasets que excedem a RAM do driver, isso causa OutOfMemoryError. Prefira .show(), .take() ou .toPandas() sobre subconjuntos previamente filtrados.

Juntando e agregando múltiplas fontes

Pipelines reais combinam dados de várias fontes. O PySpark suporta todos os tipos de join padrão, e o AQE do Spark 4.0 seleciona automaticamente broadcast joins para tabelas pequenas sem hints manuais.

python
# enrich_orders.py
# Join orders with product catalog
enriched = (
    cleaned_orders
    .join(products, on="product_code", how="left")      # Keep all orders, even unmatched
    .select(
        "order_id",
        "customer_id",
        "product_code",
        F.col("name").alias("product_name"),              # Rename for clarity
        "amount",
        F.col("category"),                                # From products table
        "order_date",
        "year_month",
    )
)

# Aggregate: monthly revenue per product category
monthly_revenue = (
    enriched
    .groupBy("year_month", "category")
    .agg(
        F.sum("amount").alias("total_revenue"),            # Sum all order amounts
        F.countDistinct("order_id").alias("order_count"),  # Unique orders
        F.avg("amount").alias("avg_order_value"),          # Average basket size
    )
    .orderBy(F.desc("total_revenue"))
)

monthly_revenue.show(10)

O join left preserva todos os pedidos mesmo quando um código de produto não tem correspondência no catálogo — cenário comum durante migrações de dados ou quando o catálogo de produtos fica atrás dos sistemas transacionais.

Pronto para mandar bem nas entrevistas de Data Engineering?

Pratique com nossos simuladores interativos, flashcards e testes tecnicos.

Escrevendo saída otimizada com particionamento

O passo final de um pipeline ETL grava os dados transformados em uma camada de armazenamento de destino. Particionar a saída por colunas frequentemente consultadas reduz drasticamente os tempos de leitura para os consumidores downstream.

python
# write_output.py
# Write enriched data partitioned by year_month
(
    enriched
    .repartition("year_month")                             # Align partitions with output
    .write
    .mode("overwrite")                                     # Replace existing partition data
    .partitionBy("year_month")                             # Physical directory partitioning
    .parquet("s3a://data-lake/curated/enriched_orders/")
)

# Write aggregated metrics as a single compact file
(
    monthly_revenue
    .coalesce(1)                                           # Single output file for small results
    .write
    .mode("overwrite")
    .parquet("s3a://data-lake/curated/monthly_revenue/")
)

print("Pipeline complete — output written to curated layer")

Usar .repartition() antes de .partitionBy() garante que cada partição física contenha um número razoável de arquivos. Sem isso, o Spark pode produzir centenas de arquivos minúsculos por partição — um antipadrão conhecido como "small files problem" que degrada a leitura em HDFS e object stores.

Spark 4.0: a API Python Data Source para conectores sob medida

O Spark 4.0 introduz a API Python Data Source, eliminando a antiga obrigatoriedade de escrever conectores customizados em Java ou Scala. Isso simplifica a integração com sistemas proprietários, APIs REST ou formatos de arquivo de nicho.

python
# custom_source.py
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

class APIDataSource(DataSource):
    """Custom data source reading from an internal REST API."""

    @classmethod
    def name(cls) -> str:
        return "internal_api"                              # Registered source name

    def schema(self) -> StructType:
        return StructType([                                # Define output schema
            StructField("id", IntegerType()),
            StructField("name", StringType()),
            StructField("status", StringType()),
        ])

    def reader(self, schema: StructType) -> "APIDataSourceReader":
        return APIDataSourceReader(self.options)           # Pass options to reader

class APIDataSourceReader(DataSourceReader):
    def __init__(self, options):
        self.endpoint = options.get("endpoint", "")        # API endpoint URL

    def read(self, partition):
        import requests                                    # Import inside read for serialization
        response = requests.get(self.endpoint)
        for record in response.json():
            yield (record["id"], record["name"], record["status"])

# Register and use the custom source
spark.dataSource.register(APIDataSource)
api_df = spark.read.format("internal_api").option("endpoint", "https://api.internal/users").load()
api_df.show()

A API Python Data Source suporta leituras batch e streaming. Para uso em produção, adicionar tratamento de erros dentro de read() e implementar paralelismo em nível de partição melhora significativamente o throughput.

Spark Connect no 4.0

O Spark Connect desacopla aplicações cliente do cluster Spark. O cliente PySpark leve (pyspark-client) pesa apenas 1,5 MB contra os 355 MB do pacote PySpark completo. Isso permite executar jobs Spark a partir de qualquer IDE ou notebook sem instalação local do Spark.

Checklist de tuning de performance para pipelines PySpark

Construir um pipeline que funciona é apenas parte do desafio. Torná-lo performático em escala exige atenção ao particionamento, ao caching e às estratégias de join.

| Técnica | Quando aplicar | Impacto | |-----------|---------------|--------| | Schemas explícitos | Toda operação de leitura | Elimina a varredura de inferência de schema | | Partition pruning | Filtragem em datasets particionados | Ignora completamente as partições irrelevantes | | Broadcast joins | Tabela pequena (< 10 MB) unida a uma grande | Evita shuffle custoso | | Caching | DataFrame reutilizado em várias ações | Evita recomputação | | Coalesce vs Repartition | Reduzir a quantidade de partições | coalesce evita shuffle completo | | AQE | Sempre (padrão no Spark 4.0) | Otimização em tempo de execução de joins e shuffles |

Monitorar a performance do pipeline pela UI do Spark permanece essencial. A visualização do DAG revela as fronteiras de shuffle e a aba SQL mostra o plano físico escolhido pelo otimizador Catalyst.

Conclusão

  • O PySpark 4.0 oferece uma API madura e voltada a Python para construir pipelines ETL em qualquer escala, com conformidade SQL ANSI habilitada por padrão
  • Definição explícita de schema, particionamento adequado e AQE eliminam os gargalos mais comuns em produção
  • A nova API Python Data Source remove a barreira Java/Scala para conectores customizados — APIs REST, formatos proprietários e fontes de streaming podem ser integrados em Python puro
  • A ordem das transformações importa: normalizar identificadores antes da deduplicação, parsear datas antes de extrair chaves de partição
  • As perguntas de entrevista de Data Engineering frequentemente abordam internals do Spark como avaliação preguiçosa, mecânica de shuffle e estratégias de particionamento — dominar esses fundamentos de pipeline prepara diretamente para essas conversas
  • Vale explorar os padrões ETL e ELT para uma cobertura mais profunda das decisões de arquitetura de pipelines

Comece a praticar!

Teste seus conhecimentos com nossos simuladores de entrevista e testes tecnicos.

Tags

#apache-spark
#pyspark
#data-pipeline
#etl
#python
#spark-4
#data-engineering

Compartilhar

Artigos relacionados