Apache Spark com Python: construindo pipelines de dados passo a passo
Tutorial prático de PySpark cobrindo operações com DataFrame, construção de pipelines ETL e recursos do Spark 4.0. Inclui exemplos de código prontos para produção voltados a engenheiros de dados que se preparam para entrevistas técnicas.

O Apache Spark continua sendo o motor de processamento distribuído dominante para pipelines de dados em larga escala em 2026. Combinado com o PySpark, oferece uma API nativa em Python que lida com cargas batch e streaming em clusters de qualquer tamanho sem comprometer a performance.
Este tutorial percorre a construção de um pipeline ETL completo com PySpark, da ingestão bruta à saída limpa, usando recursos do Spark 4.0 como a API Python Data Source e o plotting nativo sobre DataFrames.
Os DataFrames do PySpark são imutáveis. Cada transformação retorna um novo DataFrame sem alterar o original. Esse desenho habilita a avaliação preguiçosa do Spark: as transformações só são executadas quando uma ação (.show(), .write() ou .collect()) dispara o plano de cômputo.
Configurando um ambiente PySpark 4.0
Antes de construir qualquer pipeline, a sessão Spark exige configuração adequada. O Spark 4.0 ativa o modo ANSI por padrão, o que impõe uma semântica SQL mais estrita — estouros numéricos agora lançam exceções em vez de serem silenciosamente truncados.
# spark_setup.py
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("ETLPipeline")
.config("spark.sql.adaptive.enabled", "true") # AQE for runtime optimization
.config("spark.sql.shuffle.partitions", "200") # Tune based on cluster size
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
)
# Verify Spark version
print(spark.version) # 4.0.0O Adaptive Query Execution (AQE), habilitado aqui, otimiza dinamicamente as partições de shuffle e as estratégias de join em tempo de execução. O Spark 4.0 traz ganhos de 20 a 50% em cargas ETL típicas em relação ao Spark 3.x, em grande parte pelas melhorias do AQE e do otimizador Catalyst.
Lendo dados brutos com a API DataFrame
Os DataFrames do PySpark abstraem a computação distribuída por trás de uma API tabular familiar. A leitura de diferentes fontes — CSV, Parquet, JSON, bancos de dados — segue um padrão consistente.
# read_sources.py
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# Define schema explicitly — avoids slow schema inference on large files
order_schema = StructType([
StructField("order_id", StringType(), nullable=False),
StructField("customer_id", StringType(), nullable=False),
StructField("product_code", StringType(), nullable=True),
StructField("amount", DoubleType(), nullable=True),
StructField("order_date", TimestampType(), nullable=True),
StructField("region", StringType(), nullable=True),
])
# Read CSV with explicit schema
raw_orders = (
spark.read
.schema(order_schema) # Skip inference, enforce types
.option("header", "true") # First row contains column names
.option("mode", "DROPMALFORMED") # Skip rows that don't match schema
.csv("s3a://data-lake/raw/orders/")
)
# Read Parquet — schema is embedded in the file format
products = spark.read.parquet("s3a://data-lake/raw/products/")
raw_orders.printSchema()
raw_orders.show(5, truncate=False)Definir o schema explicitamente é uma boa prática de produção. A inferência de schema exige uma varredura completa da fonte, o que é caro em datasets grandes e pode errar nos tipos (interpretar CEPs como inteiros, por exemplo).
Limpando e transformando DataFrames
Dados brutos sempre precisam de limpeza. As transformações do PySpark encadeiam-se naturalmente com a API DataFrame e, como os DataFrames são imutáveis, cada etapa produz um novo DataFrame sem modificar o original.
# clean_orders.py
from pyspark.sql import functions as F
cleaned_orders = (
raw_orders
.filter(F.col("order_id").isNotNull()) # Remove null primary keys
.filter(F.col("amount") > 0) # Filter invalid amounts
.withColumn("customer_id", F.trim(F.upper(F.col("customer_id")))) # Normalize IDs
.withColumn("order_date", F.to_date(F.col("order_date"))) # Cast to date type
.withColumn("year_month", # Extract partition key
F.date_format(F.col("order_date"), "yyyy-MM"))
.dropDuplicates(["order_id"]) # Deduplicate by order ID
)
print(f"Raw: {raw_orders.count()} rows -> Cleaned: {cleaned_orders.count()} rows")A ordem importa em cadeias de transformação. Normalizar customer_id antes da deduplicação garante correspondência consistente. Extrair year_month após o parsing da data evita valores nulos na coluna de partição.
Chamar .collect() traz o DataFrame distribuído inteiro para a memória do driver. Em datasets que excedem a RAM do driver, isso causa OutOfMemoryError. Prefira .show(), .take() ou .toPandas() sobre subconjuntos previamente filtrados.
Juntando e agregando múltiplas fontes
Pipelines reais combinam dados de várias fontes. O PySpark suporta todos os tipos de join padrão, e o AQE do Spark 4.0 seleciona automaticamente broadcast joins para tabelas pequenas sem hints manuais.
# enrich_orders.py
# Join orders with product catalog
enriched = (
cleaned_orders
.join(products, on="product_code", how="left") # Keep all orders, even unmatched
.select(
"order_id",
"customer_id",
"product_code",
F.col("name").alias("product_name"), # Rename for clarity
"amount",
F.col("category"), # From products table
"order_date",
"year_month",
)
)
# Aggregate: monthly revenue per product category
monthly_revenue = (
enriched
.groupBy("year_month", "category")
.agg(
F.sum("amount").alias("total_revenue"), # Sum all order amounts
F.countDistinct("order_id").alias("order_count"), # Unique orders
F.avg("amount").alias("avg_order_value"), # Average basket size
)
.orderBy(F.desc("total_revenue"))
)
monthly_revenue.show(10)O join left preserva todos os pedidos mesmo quando um código de produto não tem correspondência no catálogo — cenário comum durante migrações de dados ou quando o catálogo de produtos fica atrás dos sistemas transacionais.
Pronto para mandar bem nas entrevistas de Data Engineering?
Pratique com nossos simuladores interativos, flashcards e testes tecnicos.
Escrevendo saída otimizada com particionamento
O passo final de um pipeline ETL grava os dados transformados em uma camada de armazenamento de destino. Particionar a saída por colunas frequentemente consultadas reduz drasticamente os tempos de leitura para os consumidores downstream.
# write_output.py
# Write enriched data partitioned by year_month
(
enriched
.repartition("year_month") # Align partitions with output
.write
.mode("overwrite") # Replace existing partition data
.partitionBy("year_month") # Physical directory partitioning
.parquet("s3a://data-lake/curated/enriched_orders/")
)
# Write aggregated metrics as a single compact file
(
monthly_revenue
.coalesce(1) # Single output file for small results
.write
.mode("overwrite")
.parquet("s3a://data-lake/curated/monthly_revenue/")
)
print("Pipeline complete — output written to curated layer")Usar .repartition() antes de .partitionBy() garante que cada partição física contenha um número razoável de arquivos. Sem isso, o Spark pode produzir centenas de arquivos minúsculos por partição — um antipadrão conhecido como "small files problem" que degrada a leitura em HDFS e object stores.
Spark 4.0: a API Python Data Source para conectores sob medida
O Spark 4.0 introduz a API Python Data Source, eliminando a antiga obrigatoriedade de escrever conectores customizados em Java ou Scala. Isso simplifica a integração com sistemas proprietários, APIs REST ou formatos de arquivo de nicho.
# custom_source.py
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
class APIDataSource(DataSource):
"""Custom data source reading from an internal REST API."""
@classmethod
def name(cls) -> str:
return "internal_api" # Registered source name
def schema(self) -> StructType:
return StructType([ # Define output schema
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("status", StringType()),
])
def reader(self, schema: StructType) -> "APIDataSourceReader":
return APIDataSourceReader(self.options) # Pass options to reader
class APIDataSourceReader(DataSourceReader):
def __init__(self, options):
self.endpoint = options.get("endpoint", "") # API endpoint URL
def read(self, partition):
import requests # Import inside read for serialization
response = requests.get(self.endpoint)
for record in response.json():
yield (record["id"], record["name"], record["status"])
# Register and use the custom source
spark.dataSource.register(APIDataSource)
api_df = spark.read.format("internal_api").option("endpoint", "https://api.internal/users").load()
api_df.show()A API Python Data Source suporta leituras batch e streaming. Para uso em produção, adicionar tratamento de erros dentro de read() e implementar paralelismo em nível de partição melhora significativamente o throughput.
O Spark Connect desacopla aplicações cliente do cluster Spark. O cliente PySpark leve (pyspark-client) pesa apenas 1,5 MB contra os 355 MB do pacote PySpark completo. Isso permite executar jobs Spark a partir de qualquer IDE ou notebook sem instalação local do Spark.
Checklist de tuning de performance para pipelines PySpark
Construir um pipeline que funciona é apenas parte do desafio. Torná-lo performático em escala exige atenção ao particionamento, ao caching e às estratégias de join.
| Técnica | Quando aplicar | Impacto |
|-----------|---------------|--------|
| Schemas explícitos | Toda operação de leitura | Elimina a varredura de inferência de schema |
| Partition pruning | Filtragem em datasets particionados | Ignora completamente as partições irrelevantes |
| Broadcast joins | Tabela pequena (< 10 MB) unida a uma grande | Evita shuffle custoso |
| Caching | DataFrame reutilizado em várias ações | Evita recomputação |
| Coalesce vs Repartition | Reduzir a quantidade de partições | coalesce evita shuffle completo |
| AQE | Sempre (padrão no Spark 4.0) | Otimização em tempo de execução de joins e shuffles |
Monitorar a performance do pipeline pela UI do Spark permanece essencial. A visualização do DAG revela as fronteiras de shuffle e a aba SQL mostra o plano físico escolhido pelo otimizador Catalyst.
Conclusão
- O PySpark 4.0 oferece uma API madura e voltada a Python para construir pipelines ETL em qualquer escala, com conformidade SQL ANSI habilitada por padrão
- Definição explícita de schema, particionamento adequado e AQE eliminam os gargalos mais comuns em produção
- A nova API Python Data Source remove a barreira Java/Scala para conectores customizados — APIs REST, formatos proprietários e fontes de streaming podem ser integrados em Python puro
- A ordem das transformações importa: normalizar identificadores antes da deduplicação, parsear datas antes de extrair chaves de partição
- As perguntas de entrevista de Data Engineering frequentemente abordam internals do Spark como avaliação preguiçosa, mecânica de shuffle e estratégias de particionamento — dominar esses fundamentos de pipeline prepara diretamente para essas conversas
- Vale explorar os padrões ETL e ELT para uma cobertura mais profunda das decisões de arquitetura de pipelines
Comece a praticar!
Teste seus conhecimentos com nossos simuladores de entrevista e testes tecnicos.
Tags
Compartilhar
Artigos relacionados

ETL vs ELT em 2026: Arquitetura de Pipelines de Dados Explicada
Comparação completa entre arquiteturas ETL e ELT para pipelines de dados em 2026, incluindo análise de custos, performance, exemplos de código e critérios de decisão para engenheiros de dados.

Top 25 Perguntas de Entrevista para Engenharia de Dados em 2026
Guia completo com as 25 perguntas mais relevantes para entrevistas de engenharia de dados em 2026. Inclui SQL, Spark, Kafka, ETL/ELT, modelagem de dados e design de pipelines.