Apache Spark와 Python으로 데이터 파이프라인 구축하기: 단계별 실전 가이드
PySpark을 활용한 실전 데이터 파이프라인 구축 튜토리얼입니다. DataFrame 연산, ETL 파이프라인 설계, Spark 4.0의 주요 기능을 프로덕션 수준의 코드 예제와 함께 설명합니다. 데이터 엔지니어 기술 면접 준비에도 도움이 됩니다.

Apache Spark는 2026년 현재에도 대규모 데이터 파이프라인을 위한 분산 처리 엔진으로 확고한 위치를 차지하고 있습니다. PySpark과 결합하면 어떤 규모의 클러스터에서든 성능 저하 없이 배치 및 스트리밍 워크로드를 처리할 수 있는 Python 네이티브 API를 제공합니다.
이 튜토리얼에서는 PySpark을 사용하여 원시 데이터 수집부터 정제된 결과물 저장까지, 완전한 ETL 데이터 파이프라인을 구축하는 과정을 다룹니다. Python Data Source API와 네이티브 DataFrame 시각화 등 Spark 4.0의 최신 기능도 함께 활용합니다.
PySpark DataFrame은 불변(immutable) 객체입니다. 모든 변환(transformation)은 원본을 수정하지 않고 새로운 DataFrame을 반환합니다. 이러한 설계 덕분에 Spark의 지연 평가(lazy evaluation)가 가능합니다. 변환은 .show(), .write(), .collect()와 같은 액션(action)이 호출될 때만 실행 계획이 트리거됩니다.
PySpark 4.0 환경 설정
파이프라인을 구축하기 전에 Spark 세션을 올바르게 설정해야 합니다. Spark 4.0은 기본적으로 ANSI 모드가 활성화되어 있으며, 더 엄격한 SQL 시맨틱을 적용합니다. 예를 들어 숫자 오버플로가 발생하면 이전처럼 자동으로 값이 순환되지 않고 예외가 발생합니다.
# spark_setup.py
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("ETLPipeline")
.config("spark.sql.adaptive.enabled", "true") # AQE for runtime optimization
.config("spark.sql.shuffle.partitions", "200") # Tune based on cluster size
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
)
# Verify Spark version
print(spark.version) # 4.0.0위 코드에서 활성화한 적응형 쿼리 실행(AQE, Adaptive Query Execution)은 런타임에 셔플 파티션과 조인 전략을 동적으로 최적화합니다. Spark 4.0은 AQE 개선과 Catalyst 옵티마이저 향상을 통해 일반적인 ETL 워크로드에서 Spark 3.x 대비 20~50%의 성능 향상을 제공합니다.
DataFrame API를 활용한 원시 데이터 읽기
PySpark DataFrame은 분산 연산을 친숙한 테이블 형태의 API로 추상화합니다. CSV, Parquet, JSON, 데이터베이스 등 다양한 소스에서 데이터를 읽을 때 일관된 패턴을 따릅니다.
# read_sources.py
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# Define schema explicitly — avoids slow schema inference on large files
order_schema = StructType([
StructField("order_id", StringType(), nullable=False),
StructField("customer_id", StringType(), nullable=False),
StructField("product_code", StringType(), nullable=True),
StructField("amount", DoubleType(), nullable=True),
StructField("order_date", TimestampType(), nullable=True),
StructField("region", StringType(), nullable=True),
])
# Read CSV with explicit schema
raw_orders = (
spark.read
.schema(order_schema) # Skip inference, enforce types
.option("header", "true") # First row contains column names
.option("mode", "DROPMALFORMED") # Skip rows that don't match schema
.csv("s3a://data-lake/raw/orders/")
)
# Read Parquet — schema is embedded in the file format
products = spark.read.parquet("s3a://data-lake/raw/products/")
raw_orders.printSchema()
raw_orders.show(5, truncate=False)명시적 스키마 정의는 프로덕션 환경의 필수 모범 사례입니다. 스키마 추론(schema inference)은 데이터 소스 전체를 스캔해야 하므로 대용량 데이터셋에서 비용이 많이 들고, 타입을 잘못 추정할 수 있습니다. 예를 들어 우편번호를 정수형으로 인식하는 경우가 이에 해당합니다.
DataFrame 정제 및 변환
원시 데이터에는 항상 정제 작업이 필요합니다. PySpark의 변환은 DataFrame API를 사용하여 자연스럽게 체이닝할 수 있으며, DataFrame이 불변 객체이므로 각 단계는 원본을 수정하지 않고 새로운 DataFrame을 생성합니다.
# clean_orders.py
from pyspark.sql import functions as F
cleaned_orders = (
raw_orders
.filter(F.col("order_id").isNotNull()) # Remove null primary keys
.filter(F.col("amount") > 0) # Filter invalid amounts
.withColumn("customer_id", F.trim(F.upper(F.col("customer_id")))) # Normalize IDs
.withColumn("order_date", F.to_date(F.col("order_date"))) # Cast to date type
.withColumn("year_month", # Extract partition key
F.date_format(F.col("order_date"), "yyyy-MM"))
.dropDuplicates(["order_id"]) # Deduplicate by order ID
)
print(f"Raw: {raw_orders.count()} rows -> Cleaned: {cleaned_orders.count()} rows")변환 체인에서 순서는 매우 중요합니다. 중복 제거 전에 customer_id를 정규화해야 일관된 매칭이 보장됩니다. 또한 날짜를 파싱한 후에 year_month를 추출해야 파티션 컬럼에 null 값이 발생하지 않습니다.
.collect()를 호출하면 분산된 DataFrame 전체가 드라이버의 메모리로 로드됩니다. 드라이버의 RAM을 초과하는 데이터셋에서는 OutOfMemoryError가 발생합니다. 대신 .show(), .take(), 또는 사전에 필터링한 하위 집합에 대해 .toPandas()를 사용하는 것이 안전합니다.
다중 소스 조인 및 집계
실제 파이프라인에서는 여러 소스의 데이터를 결합하는 작업이 필수적입니다. PySpark은 모든 표준 조인 유형을 지원하며, Spark 4.0의 AQE는 수동 힌트 없이도 소규모 테이블에 대해 자동으로 브로드캐스트 조인을 선택합니다.
# enrich_orders.py
# Join orders with product catalog
enriched = (
cleaned_orders
.join(products, on="product_code", how="left") # Keep all orders, even unmatched
.select(
"order_id",
"customer_id",
"product_code",
F.col("name").alias("product_name"), # Rename for clarity
"amount",
F.col("category"), # From products table
"order_date",
"year_month",
)
)
# Aggregate: monthly revenue per product category
monthly_revenue = (
enriched
.groupBy("year_month", "category")
.agg(
F.sum("amount").alias("total_revenue"), # Sum all order amounts
F.countDistinct("order_id").alias("order_count"), # Unique orders
F.avg("amount").alias("avg_order_value"), # Average basket size
)
.orderBy(F.desc("total_revenue"))
)
monthly_revenue.show(10)left 조인은 상품 카탈로그에 일치하는 상품 코드가 없더라도 모든 주문을 보존합니다. 이는 데이터 마이그레이션 과정이나 상품 카탈로그가 트랜잭션 시스템보다 업데이트가 느린 경우에 흔히 발생하는 시나리오입니다.
Data Engineering 면접 준비가 되셨나요?
인터랙티브 시뮬레이터, flashcards, 기술 테스트로 연습하세요.
파티셔닝을 활용한 최적화된 출력 저장
ETL 파이프라인의 최종 단계는 변환된 데이터를 대상 스토리지 레이어에 저장하는 것입니다. 자주 조회되는 컬럼을 기준으로 출력을 파티셔닝하면 하류 소비자의 읽기 시간이 크게 단축됩니다.
# write_output.py
# Write enriched data partitioned by year_month
(
enriched
.repartition("year_month") # Align partitions with output
.write
.mode("overwrite") # Replace existing partition data
.partitionBy("year_month") # Physical directory partitioning
.parquet("s3a://data-lake/curated/enriched_orders/")
)
# Write aggregated metrics as a single compact file
(
monthly_revenue
.coalesce(1) # Single output file for small results
.write
.mode("overwrite")
.parquet("s3a://data-lake/curated/monthly_revenue/")
)
print("Pipeline complete — output written to curated layer").partitionBy() 앞에 .repartition()을 사용하면 각 물리적 파티션에 적절한 수의 파일이 포함됩니다. 이를 생략하면 Spark가 파티션당 수백 개의 작은 파일을 생성할 수 있으며, 이는 HDFS와 오브젝트 스토어에서 읽기 성능을 저하시키는 대표적인 안티패턴인 "소규모 파일 문제(small files problem)"를 초래합니다.
Spark 4.0: 커스텀 커넥터를 위한 Python Data Source API
Spark 4.0에서는 Python Data Source API가 도입되어, 이전에 Java나 Scala로 작성해야 했던 커스텀 커넥터를 Python으로 구현할 수 있게 되었습니다. 이를 통해 자체 시스템, REST API, 비표준 파일 형식과의 통합이 크게 간소화되었습니다.
# custom_source.py
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
class APIDataSource(DataSource):
"""Custom data source reading from an internal REST API."""
@classmethod
def name(cls) -> str:
return "internal_api" # Registered source name
def schema(self) -> StructType:
return StructType([ # Define output schema
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("status", StringType()),
])
def reader(self, schema: StructType) -> "APIDataSourceReader":
return APIDataSourceReader(self.options) # Pass options to reader
class APIDataSourceReader(DataSourceReader):
def __init__(self, options):
self.endpoint = options.get("endpoint", "") # API endpoint URL
def read(self, partition):
import requests # Import inside read for serialization
response = requests.get(self.endpoint)
for record in response.json():
yield (record["id"], record["name"], record["status"])
# Register and use the custom source
spark.dataSource.register(APIDataSource)
api_df = spark.read.format("internal_api").option("endpoint", "https://api.internal/users").load()
api_df.show()Python Data Source API는 배치와 스트리밍 읽기를 모두 지원합니다. 프로덕션 환경에서는 read() 내부에 에러 핸들링을 추가하고, 파티션 수준의 병렬 처리를 구현하면 처리량이 크게 향상됩니다.
Spark Connect는 클라이언트 애플리케이션을 Spark 클러스터에서 분리합니다. 경량 PySpark 클라이언트(pyspark-client)의 크기는 불과 1.5MB로, 전체 PySpark 패키지(355MB)에 비해 매우 가볍습니다. 이를 통해 로컬에 Spark을 설치하지 않고도 모든 IDE나 노트북에서 Spark 작업을 실행할 수 있습니다.
PySpark 파이프라인 성능 튜닝 체크리스트
작동하는 파이프라인을 구축하는 것은 절반에 불과합니다. 대규모 환경에서 성능을 확보하려면 파티셔닝, 캐싱, 조인 전략에 대한 세밀한 관리가 필요합니다.
| 기법 | 적용 시점 | 효과 |
|-----------|---------------|--------|
| 명시적 스키마 | 모든 읽기 연산 | 스키마 추론 스캔 제거 |
| 파티션 프루닝 | 파티셔닝된 데이터셋 필터링 시 | 불필요한 데이터 파티션 전체 건너뛰기 |
| 브로드캐스트 조인 | 소규모 테이블(10MB 미만)과 대규모 테이블 조인 시 | 비용이 큰 셔플 방지 |
| 캐싱 | DataFrame이 여러 액션에서 재사용될 때 | 재연산 방지 |
| Coalesce vs Repartition | 파티션 수 줄이기 | coalesce는 전체 셔플 방지 |
| AQE | 항상 적용 (Spark 4.0 기본값) | 조인과 셔플의 런타임 최적화 |
Spark UI를 통한 파이프라인 성능 모니터링은 여전히 필수적입니다. DAG 시각화에서 셔플 경계를 확인할 수 있으며, SQL 탭에서는 Catalyst 옵티마이저가 선택한 물리적 실행 계획을 확인할 수 있습니다.
결론
- PySpark 4.0은 모든 규모의 ETL 파이프라인을 구축하기 위한 완성도 높은 Python 기반 API를 제공하며, ANSI SQL 호환성이 기본적으로 활성화되어 있습니다
- 명시적 스키마 정의, 적절한 파티셔닝, AQE 활용은 프로덕션 파이프라인에서 가장 흔한 성능 병목 현상을 제거합니다
- 새로운 Python Data Source API는 커스텀 커넥터 개발 시 Java/Scala의 장벽을 제거합니다. REST API, 자체 파일 형식, 스트리밍 소스 모두 순수 Python으로 통합할 수 있습니다
- 변환 순서가 중요합니다. 중복 제거 전에 식별자를 정규화하고, 파티션 키를 추출하기 전에 날짜를 파싱해야 합니다
- 데이터 엔지니어링 면접 질문에서는 지연 평가, 셔플 메커니즘, 파티션 전략 등 Spark 내부 동작을 자주 다룹니다. 이 파이프라인의 기본 원리를 이해하면 면접 준비에 직접적으로 도움이 됩니다
- ETL 및 ELT 패턴에서 파이프라인 아키텍처 결정에 대한 심화 내용을 확인할 수 있습니다
연습을 시작하세요!
면접 시뮬레이터와 기술 테스트로 지식을 테스트하세요.
태그
공유
관련 기사

2026 ETL vs ELT 완벽 비교: 데이터 파이프라인 아키텍처 설계 가이드
2026년 ETL과 ELT의 핵심 차이점, 비용 분석, 구현 패턴을 상세히 비교합니다. dbt, Airflow를 활용한 실전 데이터 파이프라인 설계 방법을 알아보세요.

2026년 데이터 엔지니어링 면접 질문 상위 25개
2026년 데이터 엔지니어링 면접에서 가장 많이 출제되는 25가지 핵심 질문과 실무 중심의 답변을 제공합니다.

2026년 데이터 애널리틱스 면접 질문 TOP 25
2026년 데이터 애널리틱스 면접 대비 가이드입니다. SQL, Python, Power BI, 통계, 행동 면접에서 자주 출제되는 25개 질문을 코드 예시와 함께 상세히 해설합니다.