Apache Spark dengan Python: Membangun Pipeline Data Langkah demi Langkah

Tutorial PySpark untuk membangun pipeline data ETL dengan Spark 4.0. Panduan lengkap DataFrame, Python Data Source API, dan optimasi performa.

Ilustrasi tutorial pipeline data Apache Spark dengan Python menampilkan alur dan tahapan pemrosesan data

Apache Spark tetap menjadi mesin pemrosesan terdistribusi yang paling dominan untuk pipeline data berskala besar pada tahun 2026. Dikombinasikan dengan PySpark, Spark menawarkan API berbasis Python yang mampu menangani beban kerja batch maupun streaming di seluruh klaster tanpa mengorbankan performa.

Tutorial ini membahas secara lengkap proses pembangunan pipeline ETL menggunakan PySpark, mulai dari ingest data mentah hingga output yang bersih, dengan memanfaatkan fitur-fitur Spark 4.0 seperti Python Data Source API dan plotting DataFrame native.

Referensi Cepat

PySpark DataFrame bersifat immutable. Setiap transformasi menghasilkan DataFrame baru tanpa mengubah yang asli. Desain ini memungkinkan lazy evaluation pada Spark: transformasi hanya dieksekusi ketika sebuah action (seperti .show(), .write(), atau .collect()) memicu rencana komputasi.

Menyiapkan Lingkungan PySpark 4.0

Sebelum membangun pipeline apa pun, sesi Spark perlu dikonfigurasi dengan benar. Spark 4.0 mengaktifkan mode ANSI secara default, yang menerapkan semantik SQL yang lebih ketat -- overflow numerik kini akan melemparkan exception alih-alih melakukan wrapping secara diam-diam.

python
# spark_setup.py
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("ETLPipeline")
    .config("spark.sql.adaptive.enabled", "true")       # AQE for runtime optimization
    .config("spark.sql.shuffle.partitions", "200")       # Tune based on cluster size
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)

# Verify Spark version
print(spark.version)  # 4.0.0

Adaptive Query Execution (AQE) yang diaktifkan di sini akan mengoptimasi shuffle partition dan strategi join secara dinamis saat runtime. Spark 4.0 menghadirkan peningkatan kecepatan 20-50% pada beban kerja ETL tipikal dibandingkan Spark 3.x, sebagian besar berkat peningkatan AQE dan optimizer Catalyst.

Membaca Data Mentah dengan DataFrame API

PySpark DataFrame mengabstraksikan komputasi terdistribusi di balik API tabular yang sudah familiar. Proses pembacaan dari berbagai sumber -- CSV, Parquet, JSON, database -- mengikuti pola yang konsisten.

python
# read_sources.py
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Define schema explicitly — avoids slow schema inference on large files
order_schema = StructType([
    StructField("order_id", StringType(), nullable=False),
    StructField("customer_id", StringType(), nullable=False),
    StructField("product_code", StringType(), nullable=True),
    StructField("amount", DoubleType(), nullable=True),
    StructField("order_date", TimestampType(), nullable=True),
    StructField("region", StringType(), nullable=True),
])

# Read CSV with explicit schema
raw_orders = (
    spark.read
    .schema(order_schema)                    # Skip inference, enforce types
    .option("header", "true")                # First row contains column names
    .option("mode", "DROPMALFORMED")         # Skip rows that don't match schema
    .csv("s3a://data-lake/raw/orders/")
)

# Read Parquet — schema is embedded in the file format
products = spark.read.parquet("s3a://data-lake/raw/products/")

raw_orders.printSchema()
raw_orders.show(5, truncate=False)

Mendefinisikan skema secara eksplisit merupakan best practice dalam lingkungan produksi. Schema inference memerlukan pemindaian penuh terhadap sumber data, yang memakan biaya komputasi tinggi pada dataset besar dan berpotensi menebak tipe data secara keliru -- misalnya, menginterpretasikan kode pos sebagai integer.

Membersihkan dan Mentransformasi DataFrame

Data mentah selalu memerlukan pembersihan. Transformasi PySpark dapat dirangkai secara natural menggunakan DataFrame API, dan karena DataFrame bersifat immutable, setiap langkah menghasilkan DataFrame baru tanpa mengubah data asli.

python
# clean_orders.py
from pyspark.sql import functions as F

cleaned_orders = (
    raw_orders
    .filter(F.col("order_id").isNotNull())                     # Remove null primary keys
    .filter(F.col("amount") > 0)                               # Filter invalid amounts
    .withColumn("customer_id", F.trim(F.upper(F.col("customer_id"))))  # Normalize IDs
    .withColumn("order_date", F.to_date(F.col("order_date")))  # Cast to date type
    .withColumn("year_month",                                   # Extract partition key
        F.date_format(F.col("order_date"), "yyyy-MM"))
    .dropDuplicates(["order_id"])                               # Deduplicate by order ID
)

print(f"Raw: {raw_orders.count()} rows -> Cleaned: {cleaned_orders.count()} rows")

Urutan transformasi sangat menentukan hasil akhir. Normalisasi customer_id harus dilakukan sebelum deduplikasi agar pencocokan berjalan konsisten. Ekstraksi year_month dilakukan setelah parsing tanggal untuk menghindari nilai null pada kolom partisi.

Hindari collect() pada DataFrame Besar

Pemanggilan .collect() menarik seluruh DataFrame terdistribusi ke memori driver. Pada dataset yang melebihi kapasitas RAM driver, hal ini menyebabkan OutOfMemoryError. Gunakan .show(), .take(), atau .toPandas() pada subset yang sudah difilter terlebih dahulu.

Menggabungkan dan Mengagregasi dari Berbagai Sumber

Pipeline di dunia nyata menggabungkan data dari berbagai sumber. PySpark mendukung semua tipe join standar, dan AQE pada Spark 4.0 secara otomatis memilih broadcast join untuk tabel berukuran kecil tanpa perlu petunjuk manual.

python
# enrich_orders.py
# Join orders with product catalog
enriched = (
    cleaned_orders
    .join(products, on="product_code", how="left")      # Keep all orders, even unmatched
    .select(
        "order_id",
        "customer_id",
        "product_code",
        F.col("name").alias("product_name"),              # Rename for clarity
        "amount",
        F.col("category"),                                # From products table
        "order_date",
        "year_month",
    )
)

# Aggregate: monthly revenue per product category
monthly_revenue = (
    enriched
    .groupBy("year_month", "category")
    .agg(
        F.sum("amount").alias("total_revenue"),            # Sum all order amounts
        F.countDistinct("order_id").alias("order_count"),  # Unique orders
        F.avg("amount").alias("avg_order_value"),          # Average basket size
    )
    .orderBy(F.desc("total_revenue"))
)

monthly_revenue.show(10)

Join bertipe left mempertahankan semua pesanan meskipun kode produk tidak memiliki kecocokan di katalog -- skenario yang umum terjadi selama migrasi data atau ketika katalog produk tertinggal dari sistem transaksi.

Siap menguasai wawancara Data Engineering Anda?

Berlatih dengan simulator interaktif, flashcards, dan tes teknis kami.

Langkah terakhir dalam pipeline ETL adalah menulis data yang sudah ditransformasi ke lapisan penyimpanan target. Mempartisi output berdasarkan kolom yang sering di-query secara drastis mengurangi waktu baca bagi konsumen data di hilir.

python
# write_output.py
# Write enriched data partitioned by year_month
(
    enriched
    .repartition("year_month")                             # Align partitions with output
    .write
    .mode("overwrite")                                     # Replace existing partition data
    .partitionBy("year_month")                             # Physical directory partitioning
    .parquet("s3a://data-lake/curated/enriched_orders/")
)

# Write aggregated metrics as a single compact file
(
    monthly_revenue
    .coalesce(1)                                           # Single output file for small results
    .write
    .mode("overwrite")
    .parquet("s3a://data-lake/curated/monthly_revenue/")
)

print("Pipeline complete — output written to curated layer")

Penggunaan .repartition() sebelum .partitionBy() memastikan setiap partisi fisik berisi jumlah file yang wajar. Tanpa langkah ini, Spark dapat menghasilkan ratusan file kecil per partisi -- anti-pattern yang dikenal sebagai "small files problem" yang menurunkan performa baca pada HDFS maupun object store.

Spark 4.0: Python Data Source API untuk Konektor Kustom

Spark 4.0 memperkenalkan Python Data Source API, menghilangkan keharusan menulis konektor kustom dalam Java atau Scala seperti pada versi sebelumnya. Hal ini menyederhanakan integrasi dengan sistem proprietary, REST API, atau format file khusus.

python
# custom_source.py
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

class APIDataSource(DataSource):
    """Custom data source reading from an internal REST API."""

    @classmethod
    def name(cls) -> str:
        return "internal_api"                              # Registered source name

    def schema(self) -> StructType:
        return StructType([                                # Define output schema
            StructField("id", IntegerType()),
            StructField("name", StringType()),
            StructField("status", StringType()),
        ])

    def reader(self, schema: StructType) -> "APIDataSourceReader":
        return APIDataSourceReader(self.options)           # Pass options to reader

class APIDataSourceReader(DataSourceReader):
    def __init__(self, options):
        self.endpoint = options.get("endpoint", "")        # API endpoint URL

    def read(self, partition):
        import requests                                    # Import inside read for serialization
        response = requests.get(self.endpoint)
        for record in response.json():
            yield (record["id"], record["name"], record["status"])

# Register and use the custom source
spark.dataSource.register(APIDataSource)
api_df = spark.read.format("internal_api").option("endpoint", "https://api.internal/users").load()
api_df.show()

Python Data Source API mendukung pembacaan batch maupun streaming. Untuk penggunaan di lingkungan produksi, penambahan error handling di dalam method read() serta implementasi paralelisme di tingkat partisi secara signifikan meningkatkan throughput.

Spark Connect di Versi 4.0

Spark Connect memisahkan aplikasi client dari klaster Spark. Client PySpark yang ringan (pyspark-client) hanya berukuran 1,5 MB dibandingkan paket PySpark lengkap sebesar 355 MB. Fitur ini memungkinkan eksekusi job Spark dari IDE atau notebook mana pun tanpa instalasi Spark lokal.

Checklist Optimasi Performa untuk Pipeline PySpark

Membangun pipeline yang berfungsi hanyalah sebagian dari tantangan. Membuatnya berperforma optimal pada skala besar memerlukan perhatian khusus pada partitioning, caching, dan strategi join.

| Teknik | Kapan Diterapkan | Dampak | |--------|------------------|--------| | Skema eksplisit | Setiap operasi baca | Menghilangkan pemindaian schema inference | | Partition pruning | Saat memfilter dataset berpartisi | Melewati partisi data yang tidak relevan sepenuhnya | | Broadcast join | Tabel kecil (< 10 MB) di-join dengan tabel besar | Menghindari shuffle yang mahal | | Caching | DataFrame digunakan ulang dalam beberapa action | Mencegah komputasi ulang | | Coalesce vs Repartition | Mengurangi jumlah partisi | coalesce menghindari shuffle penuh | | AQE | Selalu aktif (default di Spark 4.0) | Optimasi runtime untuk join dan shuffle |

Pemantauan performa pipeline melalui Spark UI tetap menjadi keharusan. Visualisasi DAG menunjukkan batas-batas shuffle, dan tab SQL menampilkan rencana fisik yang dipilih oleh optimizer Catalyst.

Kesimpulan

  • PySpark 4.0 menyediakan API yang matang dan berpusat pada Python untuk membangun pipeline ETL di segala skala, dengan kepatuhan ANSI SQL yang aktif secara default
  • Definisi skema eksplisit, partitioning yang tepat, dan AQE mengatasi hambatan performa yang paling umum ditemui di pipeline produksi
  • Python Data Source API yang baru menghilangkan batasan Java/Scala untuk konektor kustom -- REST API, format proprietary, dan sumber streaming semuanya dapat diintegrasikan menggunakan Python murni
  • Urutan transformasi sangat penting: normalisasi identifier sebelum deduplikasi, parsing tanggal sebelum mengekstrak kunci partisi
  • Pertanyaan interview Data Engineering sering membahas internal Spark seperti lazy evaluation, mekanisme shuffle, dan strategi partisi -- memahami dasar-dasar pipeline ini secara langsung mempersiapkan kandidat untuk menghadapi pertanyaan tersebut
  • Pelajari lebih lanjut tentang pola ETL dan ELT untuk pembahasan mendalam mengenai keputusan arsitektur pipeline

Mulai berlatih!

Uji pengetahuan Anda dengan simulator wawancara dan tes teknis kami.

Tag

#apache-spark
#pyspark
#data-engineering
#etl
#python

Bagikan

Artikel terkait