Apache Spark với Python: Xây dựng Data Pipeline từng bước
Hướng dẫn xây dựng data pipeline ETL với PySpark 4.0 - từ đọc dữ liệu thô, làm sạch DataFrame đến tối ưu hiệu suất. Kèm ví dụ code thực tế.

Apache Spark vẫn là engine xử lý phân tán chiếm ưu thế cho các data pipeline quy mô lớn trong năm 2026. Khi kết hợp với PySpark, nền tảng này cung cấp API thuần Python xử lý cả workload batch lẫn streaming trên cụm máy chủ ở bất kỳ quy mô nào mà không hy sinh hiệu suất.
Bài hướng dẫn này trình bày toàn bộ quy trình xây dựng một data pipeline ETL hoàn chỉnh với PySpark, từ giai đoạn đọc dữ liệu thô đến xuất kết quả đã qua xử lý, sử dụng các tính năng mới của Spark 4.0 như Python Data Source API và vẽ biểu đồ DataFrame tích hợp sẵn.
PySpark DataFrame là bất biến (immutable). Mỗi phép biến đổi trả về một DataFrame mới, giữ nguyên DataFrame gốc. Thiết kế này cho phép Spark thực hiện đánh giá lười (lazy evaluation): các phép biến đổi chỉ được thực thi khi một hành động (action) như .show(), .write() hoặc .collect() kích hoạt kế hoạch tính toán.
Thiết lập môi trường PySpark 4.0
Trước khi xây dựng bất kỳ pipeline nào, Spark session cần được cấu hình phù hợp. Spark 4.0 bật chế độ ANSI mặc định, áp dụng ngữ nghĩa SQL nghiêm ngặt hơn -- tràn số (numeric overflow) giờ đây ném ngoại lệ thay vì âm thầm cuộn vòng giá trị.
# spark_setup.py
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("ETLPipeline")
.config("spark.sql.adaptive.enabled", "true") # AQE for runtime optimization
.config("spark.sql.shuffle.partitions", "200") # Tune based on cluster size
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
)
# Verify Spark version
print(spark.version) # 4.0.0Adaptive Query Execution (AQE) được kích hoạt ở đây sẽ tự động tối ưu số lượng shuffle partition và chiến lược join tại thời điểm chạy. Spark 4.0 mang lại cải thiện tốc độ 20-50% trên các workload ETL điển hình so với Spark 3.x, chủ yếu nhờ các cải tiến AQE và trình tối ưu Catalyst.
Đọc dữ liệu thô với DataFrame API
PySpark DataFrame trừu tượng hóa toàn bộ tính toán phân tán đằng sau API dạng bảng quen thuộc. Việc đọc từ nhiều nguồn khác nhau -- CSV, Parquet, JSON, cơ sở dữ liệu -- đều tuân theo một mẫu nhất quán.
# read_sources.py
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# Define schema explicitly — avoids slow schema inference on large files
order_schema = StructType([
StructField("order_id", StringType(), nullable=False),
StructField("customer_id", StringType(), nullable=False),
StructField("product_code", StringType(), nullable=True),
StructField("amount", DoubleType(), nullable=True),
StructField("order_date", TimestampType(), nullable=True),
StructField("region", StringType(), nullable=True),
])
# Read CSV with explicit schema
raw_orders = (
spark.read
.schema(order_schema) # Skip inference, enforce types
.option("header", "true") # First row contains column names
.option("mode", "DROPMALFORMED") # Skip rows that don't match schema
.csv("s3a://data-lake/raw/orders/")
)
# Read Parquet — schema is embedded in the file format
products = spark.read.parquet("s3a://data-lake/raw/products/")
raw_orders.printSchema()
raw_orders.show(5, truncate=False)Định nghĩa schema tường minh là phương pháp tốt nhất trong môi trường production. Schema inference đòi hỏi quét toàn bộ nguồn dữ liệu -- thao tác tốn kém trên tập dữ liệu lớn và có thể suy luận sai kiểu dữ liệu (ví dụ nhận diện mã bưu chính thành số nguyên).
Làm sạch và biến đổi DataFrame
Dữ liệu thô luôn cần được làm sạch. Các phép biến đổi PySpark kết nối tự nhiên thông qua DataFrame API, và vì DataFrame là bất biến, mỗi bước tạo ra một DataFrame mới mà không thay đổi bản gốc.
# clean_orders.py
from pyspark.sql import functions as F
cleaned_orders = (
raw_orders
.filter(F.col("order_id").isNotNull()) # Remove null primary keys
.filter(F.col("amount") > 0) # Filter invalid amounts
.withColumn("customer_id", F.trim(F.upper(F.col("customer_id")))) # Normalize IDs
.withColumn("order_date", F.to_date(F.col("order_date"))) # Cast to date type
.withColumn("year_month", # Extract partition key
F.date_format(F.col("order_date"), "yyyy-MM"))
.dropDuplicates(["order_id"]) # Deduplicate by order ID
)
print(f"Raw: {raw_orders.count()} rows -> Cleaned: {cleaned_orders.count()} rows")Thứ tự trong chuỗi biến đổi rất quan trọng. Chuẩn hóa customer_id trước khi loại trùng đảm bảo so khớp nhất quán. Trích xuất year_month sau khi phân tích ngày tháng tránh giá trị null trong cột phân vùng.
Gọi .collect() sẽ kéo toàn bộ DataFrame phân tán vào bộ nhớ của driver. Trên tập dữ liệu vượt quá RAM của driver, thao tác này gây lỗi OutOfMemoryError. Thay vào đó, hãy sử dụng .show(), .take() hoặc .toPandas() trên các tập con đã được lọc trước.
Kết hợp và tổng hợp từ nhiều nguồn dữ liệu
Các pipeline thực tế kết hợp dữ liệu từ nhiều nguồn khác nhau. PySpark hỗ trợ tất cả các kiểu join tiêu chuẩn, và AQE trong Spark 4.0 tự động chọn broadcast join cho các bảng nhỏ mà không cần gợi ý thủ công.
# enrich_orders.py
# Join orders with product catalog
enriched = (
cleaned_orders
.join(products, on="product_code", how="left") # Keep all orders, even unmatched
.select(
"order_id",
"customer_id",
"product_code",
F.col("name").alias("product_name"), # Rename for clarity
"amount",
F.col("category"), # From products table
"order_date",
"year_month",
)
)
# Aggregate: monthly revenue per product category
monthly_revenue = (
enriched
.groupBy("year_month", "category")
.agg(
F.sum("amount").alias("total_revenue"), # Sum all order amounts
F.countDistinct("order_id").alias("order_count"), # Unique orders
F.avg("amount").alias("avg_order_value"), # Average basket size
)
.orderBy(F.desc("total_revenue"))
)
monthly_revenue.show(10)Phép join left giữ lại tất cả đơn hàng ngay cả khi mã sản phẩm không có trong danh mục -- tình huống phổ biến trong quá trình di chuyển dữ liệu hoặc khi danh mục sản phẩm chưa được cập nhật kịp với hệ thống giao dịch.
Sẵn sàng chinh phục phỏng vấn Data Engineering?
Luyện tập với mô phỏng tương tác, flashcards và bài kiểm tra kỹ thuật.
Ghi dữ liệu đầu ra tối ưu với phân vùng
Bước cuối cùng trong pipeline ETL là ghi dữ liệu đã biến đổi ra tầng lưu trữ đích. Phân vùng đầu ra theo các cột thường xuyên được truy vấn giúp giảm đáng kể thời gian đọc cho các hệ thống tiêu thụ dữ liệu phía sau.
# write_output.py
# Write enriched data partitioned by year_month
(
enriched
.repartition("year_month") # Align partitions with output
.write
.mode("overwrite") # Replace existing partition data
.partitionBy("year_month") # Physical directory partitioning
.parquet("s3a://data-lake/curated/enriched_orders/")
)
# Write aggregated metrics as a single compact file
(
monthly_revenue
.coalesce(1) # Single output file for small results
.write
.mode("overwrite")
.parquet("s3a://data-lake/curated/monthly_revenue/")
)
print("Pipeline complete — output written to curated layer")Sử dụng .repartition() trước .partitionBy() đảm bảo mỗi phân vùng vật lý chứa số lượng file hợp lý. Nếu bỏ qua bước này, Spark có thể tạo ra hàng trăm file nhỏ trong mỗi phân vùng -- một anti-pattern phổ biến gọi là "small files problem" làm giảm hiệu suất đọc trên HDFS và object store.
Spark 4.0: Python Data Source API cho connector tuỳ chỉnh
Spark 4.0 giới thiệu Python Data Source API, loại bỏ yêu cầu phải viết connector tuỳ chỉnh bằng Java hoặc Scala như trước đây. Tính năng này đơn giản hóa việc tích hợp với các hệ thống độc quyền, REST API hoặc các định dạng file đặc thù.
# custom_source.py
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
class APIDataSource(DataSource):
"""Custom data source reading from an internal REST API."""
@classmethod
def name(cls) -> str:
return "internal_api" # Registered source name
def schema(self) -> StructType:
return StructType([ # Define output schema
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("status", StringType()),
])
def reader(self, schema: StructType) -> "APIDataSourceReader":
return APIDataSourceReader(self.options) # Pass options to reader
class APIDataSourceReader(DataSourceReader):
def __init__(self, options):
self.endpoint = options.get("endpoint", "") # API endpoint URL
def read(self, partition):
import requests # Import inside read for serialization
response = requests.get(self.endpoint)
for record in response.json():
yield (record["id"], record["name"], record["status"])
# Register and use the custom source
spark.dataSource.register(APIDataSource)
api_df = spark.read.format("internal_api").option("endpoint", "https://api.internal/users").load()
api_df.show()Python Data Source API hỗ trợ cả đọc batch lẫn streaming. Trong môi trường production, việc bổ sung xử lý lỗi bên trong read() và triển khai song song ở cấp partition sẽ cải thiện đáng kể thông lượng.
Spark Connect tách rời ứng dụng client khỏi cụm Spark. Client PySpark nhẹ (pyspark-client) chỉ nặng 1.5 MB so với gói PySpark đầy đủ 355 MB. Điều này cho phép chạy Spark job từ bất kỳ IDE hay notebook nào mà không cần cài đặt Spark cục bộ.
Danh sách kiểm tra tối ưu hiệu suất cho pipeline PySpark
Xây dựng một pipeline hoạt động đúng mới chỉ là một phần của thử thách. Đảm bảo hiệu suất ở quy mô lớn đòi hỏi sự chú ý đến phân vùng, bộ nhớ đệm và chiến lược join.
| Kỹ thuật | Thời điểm áp dụng | Tác động |
|----------|-------------------|----------|
| Schema tường minh | Mỗi thao tác đọc dữ liệu | Loại bỏ quét suy luận schema |
| Partition pruning | Lọc trên tập dữ liệu đã phân vùng | Bỏ qua hoàn toàn các phân vùng không liên quan |
| Broadcast join | Bảng nhỏ (< 10 MB) join với bảng lớn | Tránh shuffle tốn kém |
| Caching | DataFrame được tái sử dụng trong nhiều action | Ngăn tính toán lại |
| Coalesce vs Repartition | Giảm số lượng partition | coalesce tránh full shuffle |
| AQE | Luôn bật (mặc định trong Spark 4.0) | Tối ưu join và shuffle tại runtime |
Giám sát hiệu suất pipeline thông qua Spark UI vẫn là bước không thể thiếu. Biểu đồ DAG cho thấy các ranh giới shuffle, và tab SQL hiển thị kế hoạch thực thi vật lý mà trình tối ưu Catalyst đã chọn.
Kết luận
- PySpark 4.0 cung cấp API thuần Python hoàn thiện để xây dựng pipeline ETL ở mọi quy mô, với ANSI SQL compliance được bật mặc định
- Định nghĩa schema tường minh, phân vùng hợp lý và AQE giải quyết những nút thắt hiệu suất phổ biến nhất trong pipeline production
- Python Data Source API mới xóa bỏ rào cản Java/Scala cho connector tuỳ chỉnh -- REST API, định dạng độc quyền và nguồn streaming đều có thể tích hợp bằng Python thuần túy
- Thứ tự biến đổi có ý nghĩa quan trọng: chuẩn hóa định danh trước khi loại trùng, phân tích ngày tháng trước khi trích xuất khóa phân vùng
- Các câu hỏi phỏng vấn Data Engineering thường đề cập đến nội bộ Spark như lazy evaluation, cơ chế shuffle và chiến lược phân vùng -- nắm vững các khái niệm pipeline cơ bản này giúp chuẩn bị trực tiếp cho những buổi phỏng vấn đó
- Tìm hiểu thêm về mô hình ETL và ELT để hiểu sâu hơn các quyết định kiến trúc pipeline
Bắt đầu luyện tập!
Kiểm tra kiến thức với mô phỏng phỏng vấn và bài kiểm tra kỹ thuật.
Thẻ
Chia sẻ
Bài viết liên quan

ETL vs ELT năm 2026: Kiến trúc Data Pipeline và So sánh Chi tiết
So sánh chi tiết giữa ETL và ELT trong data engineering. Tìm hiểu kiến trúc data pipeline, ưu điểm và nhược điểm của từng phương pháp, và cách chọn giải pháp phù hợp năm 2026.

Top 25 Câu Hỏi Phỏng Vấn Data Engineering Năm 2026
25 câu hỏi phỏng vấn data engineering được hỏi nhiều nhất năm 2026, bao gồm SQL, data pipeline, ETL/ELT, Spark, Kafka, data modeling và system design kèm lời giải chi tiết.

Top 25 Câu Hỏi Phỏng Vấn Data Analytics Năm 2026
Hướng dẫn chi tiết 25 câu hỏi phỏng vấn Data Analyst thường gặp năm 2026, bao gồm SQL, Python, thống kê, visualization và behavioral questions kèm code examples.