Python ile Apache Spark: Adim Adim Veri Hatlari Olusturma

DataFrame islemleri, ETL veri hatti yapimi ve Spark 4.0 ozelliklerini kapsayan uygulamali PySpark rehberi. Teknik mülakatlara hazirlanan veri mühendisleri icin üretime hazir kod ornekleri icerir.

Python ile Apache Spark: Veri Hatlari Olusturma

Apache Spark, 2026 yilinda büyük ölcekli veri hatlari icin baskin dagitik isleme motoru olmaya devam etmektedir. PySpark ile birlestirildiginde, performanstan ödün vermeden her boyuttaki kümelerde toplu ve akis islemlerini yöneten Python-native bir API sunmaktadir.

Bu rehber, ham veri alinmasindan temiz ciktiya kadar PySpark ile eksiksiz bir ETL veri hatti olusturma sürecini adim adim anlatmaktadir. Spark 4.0 ile gelen Python Data Source API ve yerel DataFrame grafikleri gibi yeni özellikler de ele alinmaktadir.

Hizli Basvuru

PySpark DataFrame'leri degismezdir (immutable). Her dönüsüm yeni bir DataFrame döndürür ve orijinali degistirmeden birakir. Bu tasarim, Spark'in tembel degerlendirmesini (lazy evaluation) mümkün kilar: dönüsümler yalnizca bir eylem (.show(), .write() veya .collect() gibi) hesaplama planini tetiklediginde yürütülür.

PySpark 4.0 Ortaminin Kurulumu

Herhangi bir veri hatti olusturmadan önce Spark oturumunun dogru sekilde yapilandirilmasi gerekmektedir. Spark 4.0 varsayilan olarak ANSI modunu etkinlestirir ve daha katli SQL semantigi uygular — sayisal tasma durumlari artik sessizce deger sarmak yerine istisna firlatmaktadir.

python
# spark_setup.py
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("ETLPipeline")
    .config("spark.sql.adaptive.enabled", "true")       # AQE for runtime optimization
    .config("spark.sql.shuffle.partitions", "200")       # Tune based on cluster size
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)

# Verify Spark version
print(spark.version)  # 4.0.0

Yukaridaki kodda etkinlestirilen Adaptive Query Execution (AQE), calisma zamaninda shuffle bölümlerini ve birlestirme stratejilerini dinamik olarak optimize eder. Spark 4.0, AQE iyilestirmeleri ve Catalyst optimize edici sayesinde tipik ETL is yüklerinde Spark 3.x'e kiyasla %20-50 hiz artisi saglamaktadir.

DataFrame API ile Ham Verilerin Okunmasi

PySpark DataFrame'leri, dagitik hesaplamayi tanidik bir tablo arayüzünün arkasinda soyutlar. CSV, Parquet, JSON ve veritabanlari gibi cesitli kaynaklardan okuma tutarli bir kalip izlemektedir.

python
# read_sources.py
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Define schema explicitly — avoids slow schema inference on large files
order_schema = StructType([
    StructField("order_id", StringType(), nullable=False),
    StructField("customer_id", StringType(), nullable=False),
    StructField("product_code", StringType(), nullable=True),
    StructField("amount", DoubleType(), nullable=True),
    StructField("order_date", TimestampType(), nullable=True),
    StructField("region", StringType(), nullable=True),
])

# Read CSV with explicit schema
raw_orders = (
    spark.read
    .schema(order_schema)                    # Skip inference, enforce types
    .option("header", "true")                # First row contains column names
    .option("mode", "DROPMALFORMED")         # Skip rows that don't match schema
    .csv("s3a://data-lake/raw/orders/")
)

# Read Parquet — schema is embedded in the file format
products = spark.read.parquet("s3a://data-lake/raw/products/")

raw_orders.printSchema()
raw_orders.show(5, truncate=False)

Acik sema tanimlamasi, üretim ortami icin en iyi uygulama yöntemidir. Sema cikarimi, veri kaynaginin tam bir taramasini gerektirir; bu büyük veri kümelerinde pahali bir islemdir ve tipleri yanlis tahmin edebilir (örnegin posta kodlarini tam sayi olarak yorumlamak gibi).

DataFrame Temizleme ve Dönüstürme

Ham veriler her zaman temizlik gerektirir. PySpark dönüsümleri DataFrame API kullanilarak dogal bir sekilde zincirlenir ve DataFrame'ler degismez oldugu icin her adim orijinali degistirmeden yeni bir DataFrame üretir.

python
# clean_orders.py
from pyspark.sql import functions as F

cleaned_orders = (
    raw_orders
    .filter(F.col("order_id").isNotNull())                     # Remove null primary keys
    .filter(F.col("amount") > 0)                               # Filter invalid amounts
    .withColumn("customer_id", F.trim(F.upper(F.col("customer_id"))))  # Normalize IDs
    .withColumn("order_date", F.to_date(F.col("order_date")))  # Cast to date type
    .withColumn("year_month",                                   # Extract partition key
        F.date_format(F.col("order_date"), "yyyy-MM"))
    .dropDuplicates(["order_id"])                               # Deduplicate by order ID
)

print(f"Raw: {raw_orders.count()} rows -> Cleaned: {cleaned_orders.count()} rows")

Dönüsüm sirasi önemlidir. customer_id normalizasyonunun tekilsizlestirme (deduplication) isleminden önce yapilmasi tutarli eslemeyi garanti eder. year_month cikariminin tarih ayristirma sonrasinda yapilmasi, bölüm sütununda null degerlerden kacinmayi saglar.

Büyük DataFrame'lerde collect() Kullanmaktan Kacinin

.collect() cagrisi, dagitik DataFrame'in tamamini sürücünün bellegine ceker. Sürücünün RAM'ini asan veri kümelerinde bu islem OutOfMemoryError'a neden olur. Bunun yerine önceden filtrelenmis alt kümeler üzerinde .show(), .take() veya .toPandas() kullanilmalidir.

Birden Fazla Kaynaktan Veri Birlestirme ve Toplama

Gercek veri hatlari birden fazla kaynaktan gelen verileri birlestirir. PySpark tüm standart birlestirme türlerini destekler ve Spark 4.0'in AQE'si küçük tablolar icin manuel ipuclari olmadan otomatik olarak broadcast join secer.

python
# enrich_orders.py
# Join orders with product catalog
enriched = (
    cleaned_orders
    .join(products, on="product_code", how="left")      # Keep all orders, even unmatched
    .select(
        "order_id",
        "customer_id",
        "product_code",
        F.col("name").alias("product_name"),              # Rename for clarity
        "amount",
        F.col("category"),                                # From products table
        "order_date",
        "year_month",
    )
)

# Aggregate: monthly revenue per product category
monthly_revenue = (
    enriched
    .groupBy("year_month", "category")
    .agg(
        F.sum("amount").alias("total_revenue"),            # Sum all order amounts
        F.countDistinct("order_id").alias("order_count"),  # Unique orders
        F.avg("amount").alias("avg_order_value"),          # Average basket size
    )
    .orderBy(F.desc("total_revenue"))
)

monthly_revenue.show(10)

left birlestirmesi, bir ürün kodunun katalogda karsiligi olmasa bile tüm siparisleri korur — veri göcü sirasinda veya ürün kataloglarinin islem sistemlerinin gerisinde kaldigi durumlarda yaygin bir senaryodur.

Data Engineering mülakatlarında başarılı olmaya hazır mısın?

İnteraktif simülatörler, flashcards ve teknik testlerle pratik yap.

Bölümleme ile Optimize Edilmis Cikti Yazma

Bir ETL veri hattindaki son adim, dönüstürülmüs verileri hedef depolama katmanina yazmaktir. Ciktinin sik sorgulanan sütunlara göre bölümlenmesi, alt akis tüketicileri icin okuma sürelerini önemli ölcüde azaltir.

python
# write_output.py
# Write enriched data partitioned by year_month
(
    enriched
    .repartition("year_month")                             # Align partitions with output
    .write
    .mode("overwrite")                                     # Replace existing partition data
    .partitionBy("year_month")                             # Physical directory partitioning
    .parquet("s3a://data-lake/curated/enriched_orders/")
)

# Write aggregated metrics as a single compact file
(
    monthly_revenue
    .coalesce(1)                                           # Single output file for small results
    .write
    .mode("overwrite")
    .parquet("s3a://data-lake/curated/monthly_revenue/")
)

print("Pipeline complete — output written to curated layer")

.partitionBy() öncesinde .repartition() kullanmak, her fiziksel bölümün makul sayida dosya icermesini saglar. Bu olmadan Spark, bölüm basina yüzlerce küçük dosya üretebilir — HDFS ve nesne depolarinda okuma performansini düsüren "küçük dosya problemi" olarak bilinen yaygin bir anti-kaliptir.

Spark 4.0: Özel Konektörler icin Python Data Source API

Spark 4.0, özel konektörlerin Java veya Scala'da yazilmasi gerekliligi ortadan kaldiran Python Data Source API'yi tanitir. Bu, tescilli sistemler, REST API'leri ve nis dosya formatlariyla entegrasyonu kolaylastirir.

python
# custom_source.py
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

class APIDataSource(DataSource):
    """Custom data source reading from an internal REST API."""

    @classmethod
    def name(cls) -> str:
        return "internal_api"                              # Registered source name

    def schema(self) -> StructType:
        return StructType([                                # Define output schema
            StructField("id", IntegerType()),
            StructField("name", StringType()),
            StructField("status", StringType()),
        ])

    def reader(self, schema: StructType) -> "APIDataSourceReader":
        return APIDataSourceReader(self.options)           # Pass options to reader

class APIDataSourceReader(DataSourceReader):
    def __init__(self, options):
        self.endpoint = options.get("endpoint", "")        # API endpoint URL

    def read(self, partition):
        import requests                                    # Import inside read for serialization
        response = requests.get(self.endpoint)
        for record in response.json():
            yield (record["id"], record["name"], record["status"])

# Register and use the custom source
spark.dataSource.register(APIDataSource)
api_df = spark.read.format("internal_api").option("endpoint", "https://api.internal/users").load()
api_df.show()

Python Data Source API hem toplu hem de akis okumalarini destekler. Üretim kullaniminda, read() icinde hata islemesi eklenmesi ve bölüm düzeyinde paralellik uygulanmasi verim kapasitesini önemli ölcüde artirmaktadir.

Spark 4.0'da Spark Connect

Spark Connect, istemci uygulamalarini Spark kümesinden ayirir. Hafif PySpark istemcisi (pyspark-client) tam 355 MB'lik PySpark paketine kiyasla yalnizca 1,5 MB agirligindadir. Bu, yerel bir Spark kurulumu olmadan herhangi bir IDE'den veya not defterinden Spark islerinin calistirilmasina olanak tanir.

PySpark Veri Hatlari icin Performans Ayari Kontrol Listesi

Calisan bir veri hatti olusturmak zorlugun yalnizca bir parçasidir. Büyük ölcekte performans elde etmek, bölümleme, önbellekleme ve birlestirme stratejilerine dikkat edilmesini gerektirir.

| Teknik | Ne Zaman Uygulanir | Etki | |--------|---------------------|------| | Acik semalar | Her okuma isleminde | Sema cikarim taramasini ortadan kaldirir | | Bölüm budama | Bölümlenmis veri kümeleri filtrelenirken | Ilgisiz veri bölümlerini tamamen atlar | | Broadcast join | Küçük tablo (< 10 MB) büyük tabloyla birlestirilirken | Pahali shuffle isleminden kacinir | | Önbellekleme | DataFrame birden fazla eylemde yeniden kullanildiginda | Yeniden hesaplamayi önler | | Coalesce vs Repartition | Bölüm sayisi azaltilirken | coalesce tam shuffle'dan kacinir | | AQE | Her zaman (Spark 4.0'da varsayilan) | Birlestirme ve shuffle islemlerinin calisma zamani optimizasyonu |

Spark UI üzerinden veri hatti performansinin izlenmesi vazgecilmez olmaya devam etmektedir. DAG görselestirmesi shuffle sinirlarini ortaya koyar ve SQL sekmesi Catalyst optimize edici tarafindan secilen fiziksel plani gösterir.

Sonuc

  • PySpark 4.0, varsayilan olarak ANSI SQL uyumlulugu ile her ölcekte ETL veri hatti olusturmak icin olgun, Python-öncelikli bir API sunmaktadir
  • Acik sema tanimlama, dogru bölümleme ve AQE, üretim veri hatlarindaki en yaygin performans darbogazlarini ortadan kaldirir
  • Yeni Python Data Source API, özel konektörler icin Java/Scala engelini ortadan kaldirir — REST API'leri, tescilli formatlar ve akis kaynaklari tamamen saf Python ile entegre edilebilir
  • Dönüsüm sirasi önemlidir: tanimlayicilari tekilsizlestirmeden önce normalize edin, bölüm anahtarlarini cikarmadan önce tarihleri ayristirin
  • Veri mühendisligi mülakat sorulari genellikle tembel degerlendirme, shuffle mekanikleri ve bölümleme stratejileri gibi Spark ic yapilarini kapsar — bu veri hatti temellerini anlamak, bu görüsmelere dogrudan hazirlayici niteliktedir
  • Veri hatti mimarisi kararlari hakkinda daha derin bilgi icin ETL ve ELT kaliplari incelenmelidir

Pratik yapmaya başla!

Mülakat simülatörleri ve teknik testlerle bilgini test et.

Etiketler

#apache-spark
#pyspark
#data-pipeline
#etl
#python
#spark-4
#data-engineering

Paylaş

İlgili makaleler