Apache Airflow năm 2026: Điều phối Pipeline, DAG và Câu hỏi Phỏng vấn

Hướng dẫn Apache Airflow 3.2 dành cho kỹ sư dữ liệu: xây dựng DAG với Task SDK, điều phối pipeline dữ liệu, asset partition, async task và câu hỏi phỏng vấn thực tế cho năm 2026.

Apache Airflow pipeline orchestration DAGs tutorial 2026

Apache Airflow 3.2 đánh dấu bước tiến quan trọng nhất của nền tảng điều phối mã nguồn mở này kể từ lần tái cấu trúc kiến trúc lớn trong phiên bản 3.0. Với hơn 10 triệu lượt tải hàng tháng trên PyPI và được triển khai rộng rãi tại các tổ chức từ Airbnb đến Spotify, Airflow tiếp tục giữ vị trí công cụ hàng đầu trong việc xây dựng, lập lịch và giám sát pipeline dữ liệu. Bài viết này trình bày cách viết DAG với Task SDK mới, các mẫu điều phối pipeline, và những câu hỏi thường xuất hiện trong phỏng vấn kỹ sư dữ liệu.

Airflow 3.2 có gì mới

Airflow 3.2, phát hành tháng 4 năm 2026, giới thiệu asset partition cho lập lịch theo phân vùng dữ liệu, hỗ trợ async gốc trong PythonOperator, và triển khai đa nhóm (multi-team deployment). Toàn bộ import DAG hiện sử dụng namespace ổn định airflow.sdk được đưa vào từ Airflow 3.0.

Viết DAG với Airflow Task SDK

Airflow 3.0 đã giới thiệu Task SDK, một package độc lập tách biệt định nghĩa DAG khỏi các thành phần nội bộ của Airflow. Mục tiêu chính: viết DAG có tính di động cao, ổn định qua các phiên bản, và có thể nâng cấp Airflow mà không cần thay đổi mã nguồn. Tất cả các đối tượng cốt lõi -- DAG, dag, task, BaseOperator, Connection, Variable -- hiện nằm trong airflow.sdk.

Các đường dẫn import cũ (airflow.decorators.task, airflow.models.dag.DAG) vẫn hoạt động trong phiên bản 3.2 nhưng đã được đánh dấu deprecated và sẽ bị loại bỏ trong các bản phát hành tương lai.

python
# etl_daily_revenue.py
import pendulum
from airflow.sdk import dag, task

@dag(
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=["finance", "etl"],
)
def etl_daily_revenue():
    @task()
    def extract_transactions() -> list[dict]:
        # Pulls raw transactions from the payments API
        import requests
        response = requests.get("https://api.internal/payments/daily")
        return response.json()["transactions"]

    @task(multiple_outputs=True)
    def transform(transactions: list[dict]) -> dict:
        # Aggregates by currency and computes totals
        totals = {}
        for tx in transactions:
            currency = tx["currency"]
            totals[currency] = totals.get(currency, 0) + tx["amount"]
        return {"totals": totals, "count": len(transactions)}

    @task()
    def load(totals: dict, count: int):
        # Inserts aggregated row into the warehouse
        from warehouse import insert_revenue_summary
        insert_revenue_summary(totals=totals, transaction_count=count)

    raw = extract_transactions()
    result = transform(raw)
    load(result["totals"], result["count"])

etl_daily_revenue()

Decorator @dag thay thế cú pháp context manager truyền thống with DAG(...). Decorator @task biến các hàm Python thuần túy thành Airflow task, với quá trình tuần tự hóa XCom được xử lý tự động. Khi gọi transform(raw), một dependency được khai báo ngầm -- Airflow xây dựng các cạnh DAG từ đồ thị lời gọi hàm.

Dynamic Task Mapping cho khối lượng công việc thay đổi

DAG tĩnh gặp vấn đề khi số lượng phần tử cần xử lý thay đổi giữa các lần chạy. Dynamic task mapping, ổn định từ Airflow 2.4 và được cải tiến trong dòng 3.x, giải quyết bài toán này bằng cách mở rộng task tại thời điểm chạy thông qua .expand().

python
# etl_multi_region.py
import pendulum
from airflow.sdk import dag, task

@dag(
    schedule="@hourly",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=["regional", "etl"],
)
def etl_multi_region():
    @task()
    def get_regions() -> list[str]:
        # Returns the active regions from config
        return ["us-east-1", "eu-west-1", "ap-southeast-1"]

    @task()
    def process_region(region: str) -> dict:
        # Processes events for a single region
        from data_processor import aggregate_events
        return aggregate_events(region)

    @task()
    def merge_results(results: list[dict]):
        # Combines all regional outputs into a single report
        from reporter import publish_global_report
        publish_global_report(results)

    regions = get_regions()
    # .expand() creates one mapped task instance per region
    processed = process_region.expand(region=regions)
    merge_results(processed)

etl_multi_region()

Tại thời điểm thực thi, Airflow tạo ba instance song song của process_region -- mỗi instance xử lý một region. Nếu ngày mai xuất hiện thêm region thứ tư, không cần thay đổi mã DAG. Task merge_results chờ tất cả các mapped instance hoàn thành trước khi thực thi.

Giới hạn mapped task

Mặc định, Airflow giới hạn mapped task ở mức 1024 instance cho mỗi mapping. Có thể ghi đè bằng tham số max_map_length trong cấu hình DAG khi xử lý các tập dữ liệu lớn hơn.

Asset Partition: Lập lịch theo dữ liệu trong Airflow 3.2

Trước Airflow 3.2, lập lịch theo dữ liệu hoạt động ở cấp asset: khi một DAG upstream cập nhật asset, tất cả các DAG downstream sẽ được kích hoạt bất kể phần dữ liệu nào thay đổi. Asset partition khắc phục điều này bằng cách cho phép kiểm soát ở mức phân vùng.

Hãy xem xét tình huống trong đó ba DAG upstream tạo ra thống kê cầu thủ theo giờ cho các giải đấu thể thao khác nhau. DAG phân tích downstream chỉ nên được kích hoạt khi cả ba giải đấu đã xuất bản dữ liệu cho cùng một khung giờ.

python
# downstream_analytics.py
import pendulum
from airflow.sdk import dag, task
from airflow.timetables.assets import CronPartitionTimetable

# Define partitioned assets with hourly granularity
nba_stats = Asset("s3://datalake/nba/hourly/", partitions=CronPartitionTimetable("0 * * * *"))
epl_stats = Asset("s3://datalake/epl/hourly/", partitions=CronPartitionTimetable("0 * * * *"))
nfl_stats = Asset("s3://datalake/nfl/hourly/", partitions=CronPartitionTimetable("0 * * * *"))

@dag(
    # Triggers only when all three assets have matching partition
    schedule=(nba_stats & epl_stats & nfl_stats),
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
)
def unified_sports_analytics():
    @task()
    def aggregate_all_leagues(**context):
        partition = context["partition"]
        # Process only the specific hourly slice across all leagues
        from analytics import build_cross_league_report
        build_cross_league_report(partition=partition)

    aggregate_all_leagues()

unified_sports_analytics()

Lập lịch theo phân vùng loại bỏ các lần chạy pipeline thừa. DAG downstream chỉ kích hoạt khi đúng phân vùng cần thiết đã sẵn sàng trên tất cả các nguồn upstream -- không sớm hơn, không chạy trên dữ liệu chưa đầy đủ.

Sẵn sàng chinh phục phỏng vấn Data Engineering?

Luyện tập với mô phỏng tương tác, flashcards và bài kiểm tra kỹ thuật.

Async Task gốc cho tác vụ I/O nặng

Airflow 3.2 bổ sung hỗ trợ async gốc trong PythonOperator. Trước đây, việc chạy song song các thao tác I/O (hàng nghìn lời gọi API, tải file hàng loạt) đòi hỏi phải viết các deferrable operator tùy chỉnh. Giờ đây, chỉ cần truyền trực tiếp một hàm async.

python
# async_file_download.py
import asyncio
import aiohttp
from airflow.sdk import dag, task
import pendulum

@dag(
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=["async", "download"],
)
def async_file_download():
    @task()
    async def download_files():
        urls = [f"https://data.source/files/{i}.csv" for i in range(500)]
        async with aiohttp.ClientSession() as session:
            # Downloads 500 files concurrently instead of sequentially
            tasks = [fetch_and_save(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
        return {"downloaded": len(results)}

    download_files()

async def fetch_and_save(session: aiohttp.ClientSession, url: str):
    async with session.get(url) as resp:
        content = await resp.read()
        filename = url.split("/")[-1]
        with open(f"/data/downloads/{filename}", "wb") as f:
            f.write(content)
        return filename

async_file_download()

Phương pháp async tải 500 file đồng thời thông qua một worker slot duy nhất, thay vì 500 lời gọi HTTP tuần tự trong phiên bản đồng bộ. Với các tác vụ bị giới hạn bởi I/O, cách tiếp cận này mang lại tốc độ nhanh hơn hàng chục lần mà không cần cung cấp thêm worker.

Kiến trúc Airflow: Các thành phần và luồng thực thi

Nắm vững kiến trúc của Airflow không chỉ cần thiết cho việc vận hành hệ thống trong môi trường production mà còn là chủ đề quan trọng trong các buổi phỏng vấn. Năm thành phần tương tác trong mỗi lần chạy DAG:

  • Scheduler -- Phân tích các file DAG, giải quyết dependency và đưa task vào hàng đợi để thực thi. Trong Airflow 3.x, scheduler hoạt động theo mô hình stateless với cơ sở dữ liệu metadata.
  • Executor -- Quyết định nơi task được chạy. LocalExecutor xử lý trên một máy duy nhất. CeleryExecutor phân phối task qua các worker node. KubernetesExecutor tạo một pod riêng cho mỗi task.
  • Worker -- Thực thi mã nguồn task. Với Task Execution API mới trong Airflow 3.0, worker giao tiếp thông qua một giao thức ổn định, cho phép thực thi trong container, môi trường edge hoặc runtime bên ngoài.
  • Metadata database -- PostgreSQL (khuyến nghị) hoặc MySQL. Lưu trữ định nghĩa DAG, trạng thái task, giá trị XCom, thông tin kết nối và nhật ký kiểm toán.
  • Web server -- Giao diện UI của Airflow để giám sát các lần chạy DAG, kiểm tra log, kích hoạt chạy thủ công và quản lý kết nối.
Lựa chọn executor cho production

Không nên sử dụng SequentialExecutor trong môi trường production -- nó chỉ chạy một task tại một thời điểm và chỉ tồn tại cho mục đích phát triển. Với các môi trường Kubernetes, KubernetesExecutor cung cấp mức độ cách ly mạnh nhất vì mỗi task nhận một pod riêng với tài nguyên và dependency độc lập.

So sánh Airflow, Prefect và Dagster năm 2026

Airflow cạnh tranh với hai giải pháp thay thế đáng chú ý. Lựa chọn phù hợp phụ thuộc vào quy mô nhóm, hạ tầng hiện có và loại pipeline đang xây dựng.

| Feature | Airflow 3.2 | Prefect 3.x | Dagster 1.9 | |---|---|---|---| | DAG definition | Python decorators (airflow.sdk) | Python decorators (@flow, @task) | Python decorators (@asset, @op) | | Scheduling | Cron, asset-aware, partition-aware | Cron, event-driven | Cron, sensor-based, asset-aware | | Execution model | Centralized scheduler + distributed workers | Hybrid (server + work pools) | Centralized dagster-daemon | | Dynamic tasks | .expand() mapped tasks | Native Python loops | Dynamic partitions | | Async support | Native in 3.2 | Native since 2.0 | Async I/O ops | | Multi-team isolation | Built-in (3.2 experimental) | Workspace-based (Cloud) | Branch deployments | | Community size | Largest (35k+ GitHub stars) | Growing (18k+ stars) | Growing (12k+ stars) | | Best fit | Complex, multi-team pipelines with established infrastructure | Smaller teams wanting fast iteration | Data asset-centric organizations |

Lợi thế của Airflow nằm ở hệ sinh thái: hơn 80 provider package hỗ trợ mọi dịch vụ cloud, cơ sở dữ liệu và API phổ biến. Prefect nổi bật về trải nghiệm lập trình viên với ít boilerplate hơn. Mô hình asset-centric của Dagster phù hợp với các nhóm tư duy theo hướng sản phẩm dữ liệu thay vì chuỗi task tuần tự.

Câu hỏi phỏng vấn: Apache Airflow cho kỹ sư dữ liệu

Dưới đây là những câu hỏi phản ánh nội dung mà các quản lý tuyển dụng và kỹ sư cấp cao thường đặt ra trong phỏng vấn kỹ sư dữ liệu tại các công ty vận hành Airflow trong môi trường production.

DAG là gì và Airflow sử dụng nó như thế nào?

DAG (Directed Acyclic Graph -- Đồ thị có hướng không chu trình) định nghĩa một workflow dưới dạng tập hợp các task với các dependency, đảm bảo không có tham chiếu vòng. Airflow phân tích các file Python trong thư mục dags/, xây dựng đồ thị dependency, và scheduler xác định thứ tự thực thi. Mỗi lần chạy DAG tạo ra một đối tượng DagRun gắn với một logical date. Ràng buộc "không có chu trình" đảm bảo scheduler luôn tìm được thứ tự thực thi hợp lệ -- task A chạy trước task B, không bao giờ xảy ra chiều ngược lại đồng thời.

XCom hoạt động như thế nào và khi nào nên tránh sử dụng?

XCom (cross-communication) truyền các phần dữ liệu nhỏ giữa các task. Một task đẩy giá trị trả về vào XCom; các task downstream kéo giá trị đó ra. Task SDK xử lý quá trình này tự động khi truyền giá trị trả về giữa các decorated task. XCom lưu trữ dữ liệu trong metadata database theo mặc định, nên không phù hợp cho các tập dữ liệu lớn (bất cứ thứ gì vượt quá vài KB). Khi cần truyền dữ liệu lớn, nên sử dụng bộ lưu trữ ngoài (S3, GCS) và chỉ truyền đường dẫn tham chiếu qua XCom.

Giải thích sự khác biệt giữa schedule, start_datecatchup.

Tham số schedule (đổi tên từ schedule_interval trong Airflow 3.x) xác định tần suất DAG chạy: chuỗi cron, timedelta, đối tượng timetable hoặc asset trigger. start_date thiết lập logical date sớm nhất mà một DAG run có thể được tạo. catchup=True (mặc định) tạo DAG run cho tất cả các khoảng thời gian bị bỏ lỡ giữa start_date và thời điểm hiện tại. Đặt catchup=False yêu cầu scheduler bỏ qua các khoảng thời gian trong quá khứ và chỉ lập lịch từ thời điểm hiện tại trở đi. Mẫu phổ biến trong production: đặt catchup=False cho các DAG vận hành và catchup=True cho backfill lịch sử.

KubernetesExecutor khác CeleryExecutor ở điểm nào?

CeleryExecutor duy trì một nhóm worker process chạy liên tục, kết nối qua message broker (Redis hoặc RabbitMQ). Task được đưa vào hàng đợi và thực thi trên các worker khả dụng. KubernetesExecutor tạo một pod Kubernetes mới cho mỗi task, sử dụng Docker image và yêu cầu tài nguyên riêng của task đó. Celery có độ trễ thấp hơn (không có chi phí khởi tạo pod) và hoạt động tốt với các workload đồng nhất. KubernetesExecutor cung cấp mức cách ly mạnh hơn, kiểm soát tài nguyên theo từng task và loại bỏ nhu cầu quản lý nhóm worker tĩnh -- lý tưởng cho các workload không đồng nhất, nơi các task có yêu cầu dependency và bộ nhớ khác nhau.

Có những chiến lược nào để xử lý lỗi task?

Airflow cung cấp nhiều cơ chế xử lý lỗi. retriesretry_delay cấu hình thử lại tự động với exponential backoff. on_failure_callback kích hoạt logic tùy chỉnh (cảnh báo Slack, sự cố PagerDuty) khi task thất bại. trigger_rule kiểm soát cách các task downstream phản ứng -- all_success (mặc định), one_success, all_failed hoặc none_failed_min_one_success. Với các sự cố hạ tầng tạm thời, tham số retry_exponential_backoff=True tăng thời gian chờ giữa các lần thử lại. SLA (hiện là Deadline Alerts trong 3.x) giám sát thời gian thực thi và kích hoạt callback khi task vượt quá thời gian chạy dự kiến.

Thực hành tốt nhất cho triển khai Airflow trong production

Vận hành Airflow ổn định ở quy mô lớn đòi hỏi sự chú ý đến một số mẫu vận hành vượt xa việc viết DAG đúng cách.

Task idempotent. Mỗi task cần tạo ra cùng một kết quả khi được thực thi nhiều lần với cùng đầu vào. Sử dụng INSERT ... ON CONFLICT hoặc MERGE thay vì INSERT đơn thuần. Phân vùng dữ liệu đầu ra theo logical date. Điều này đảm bảo việc thử lại và backfill an toàn mà không tạo dữ liệu trùng lặp.

DAG nhỏ, tập trung. Tránh xây dựng các DAG monolithic với hàng chục task. Chia pipeline phức tạp thành nhiều DAG kết nối thông qua asset (trước đây gọi là dataset). DAG nhỏ hơn được phân tích nhanh hơn, dễ debug hơn và cho phép khởi động lại từng phần pipeline.

Quản lý kết nối. Lưu trữ tất cả thông tin xác thực trong connection manager của Airflow hoặc backend bí mật bên ngoài (AWS Secrets Manager, HashiCorp Vault). Không bao giờ hardcode thông tin xác thực trong file DAG. Airflow 3.x mã hóa các trường connection khi lưu trữ trong metadata database.

Giám sát và cảnh báo. Xuất metric Airflow sang Prometheus hoặc StatsD. Theo dõi scheduler_heartbeat, dag_processing.total_parse_timeexecutor.queued_tasks. Airflow 3.2 bổ sung OpenTelemetry trace cho khả năng quan sát pipeline từ đầu đến cuối.

Chuẩn bị cho buổi phỏng vấn kỹ sư dữ liệu về điều phối pipeline? SharpSkill cung cấp các câu hỏi phỏng vấn Airflow và module thực hành phản ánh các tình huống thực tế từ môi trường production.

Bắt đầu luyện tập!

Kiểm tra kiến thức với mô phỏng phỏng vấn và bài kiểm tra kỹ thuật.

Kết luận

  • Airflow 3.2 giới thiệu Task SDK (airflow.sdk) làm API ổn định cho việc viết DAG -- nên chuyển đổi import ngay để tránh lỗi tương thích trong các bản phát hành tương lai
  • Asset partition cho phép lập lịch theo phân vùng dữ liệu, loại bỏ các lần kích hoạt pipeline thừa khi dữ liệu chỉ được cập nhật một phần
  • Hỗ trợ async gốc trong PythonOperator xử lý các tác vụ I/O nặng (tải file hàng loạt, fan-out API) mà không cần viết deferrable operator tùy chỉnh
  • Dynamic task mapping với .expand() tự động thích ứng với khối lượng workload thay đổi tại runtime mà không cần sửa mã DAG
  • Chuẩn bị phỏng vấn nên bao gồm cơ chế DAG, đánh đổi giữa các executor (Celery vs. Kubernetes), giới hạn XCom và các mẫu idempotency
  • Triển khai production cần DAG nhỏ với task idempotent, quản lý bí mật bên ngoài và xuất metric sang nền tảng quan sát

Bắt đầu luyện tập!

Kiểm tra kiến thức với mô phỏng phỏng vấn và bài kiểm tra kỹ thuật.

Thẻ

#Apache Airflow
#DAG
#Data Pipeline
#Data Engineering
#Python

Chia sẻ

Bài viết liên quan