Apache Airflow di Tahun 2026: Orkestrasi Pipeline, DAG, dan Pertanyaan Wawancara

Panduan lengkap Apache Airflow 3.2 untuk data engineer: pembuatan DAG dengan Task SDK, pola orkestrasi pipeline data, asset partition, task async native, dan pertanyaan wawancara kerja dengan contoh kode praktis untuk tahun 2026.

Tutorial orkestrasi pipeline Apache Airflow DAG 2026

Apache Airflow 3.2 merupakan evolusi paling signifikan dari platform orkestrasi open-source ini sejak perombakan arsitektur besar di versi 3.0. Dengan lebih dari 10 juta unduhan bulanan di PyPI dan adopsi di berbagai perusahaan mulai dari Airbnb hingga Spotify, Airflow tetap menjadi alat dominan untuk membangun, menjadwalkan, dan memantau pipeline data. Tutorial ini membahas pembuatan DAG menggunakan Task SDK terbaru, pola orkestrasi pipeline, serta pertanyaan-pertanyaan yang kerap muncul dalam wawancara kerja data engineering.

Sekilas tentang Airflow 3.2

Airflow 3.2, yang dirilis pada April 2026, memperkenalkan asset partition untuk penjadwalan data-aware yang lebih granular, dukungan async native pada PythonOperator, dan deployment multi-tim. Seluruh import DAG kini menggunakan namespace stabil airflow.sdk yang diperkenalkan di Airflow 3.0.

Pembuatan DAG dengan Airflow Task SDK

Airflow 3.0 memperkenalkan Task SDK, sebuah paket mandiri yang memisahkan definisi DAG dari internal Airflow. Tujuannya adalah menulis DAG yang portabel dan stabil terhadap perubahan versi, sehingga tetap berfungsi meskipun Airflow diperbarui tanpa perlu mengubah kode. Seluruh objek inti — DAG, dag, task, BaseOperator, Connection, Variable — kini berada di bawah airflow.sdk.

Path import lama (airflow.decorators.task, airflow.models.dag.DAG) masih berfungsi di versi 3.2, tetapi sudah ditandai sebagai deprecated dan akan dihapus di rilis mendatang.

python
# etl_daily_revenue.py
import pendulum
from airflow.sdk import dag, task

@dag(
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=["finance", "etl"],
)
def etl_daily_revenue():
    @task()
    def extract_transactions() -> list[dict]:
        # Pulls raw transactions from the payments API
        import requests
        response = requests.get("https://api.internal/payments/daily")
        return response.json()["transactions"]

    @task(multiple_outputs=True)
    def transform(transactions: list[dict]) -> dict:
        # Aggregates by currency and computes totals
        totals = {}
        for tx in transactions:
            currency = tx["currency"]
            totals[currency] = totals.get(currency, 0) + tx["amount"]
        return {"totals": totals, "count": len(transactions)}

    @task()
    def load(totals: dict, count: int):
        # Inserts aggregated row into the warehouse
        from warehouse import insert_revenue_summary
        insert_revenue_summary(totals=totals, transaction_count=count)

    raw = extract_transactions()
    result = transform(raw)
    load(result["totals"], result["count"])

etl_daily_revenue()

Dekorator @dag menggantikan context manager tradisional with DAG(...). Dekorator @task mengubah fungsi Python biasa menjadi task Airflow, dengan serialisasi XCom yang ditangani secara otomatis. Pemanggilan transform(raw) mendeklarasikan dependensi — Airflow membangun edge DAG dari graf pemanggilan fungsi tersebut.

Dynamic Task Mapping untuk Beban Kerja Variabel

DAG statis tidak mampu mengakomodasi jumlah item yang berubah antar eksekusi. Dynamic task mapping, yang telah stabil sejak Airflow 2.4 dan terus disempurnakan di versi 3.x, menyelesaikan masalah ini dengan mengekspansi task secara runtime menggunakan .expand().

python
# etl_multi_region.py
import pendulum
from airflow.sdk import dag, task

@dag(
    schedule="@hourly",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=["regional", "etl"],
)
def etl_multi_region():
    @task()
    def get_regions() -> list[str]:
        # Returns the active regions from config
        return ["us-east-1", "eu-west-1", "ap-southeast-1"]

    @task()
    def process_region(region: str) -> dict:
        # Processes events for a single region
        from data_processor import aggregate_events
        return aggregate_events(region)

    @task()
    def merge_results(results: list[dict]):
        # Combines all regional outputs into a single report
        from reporter import publish_global_report
        publish_global_report(results)

    regions = get_regions()
    # .expand() creates one mapped task instance per region
    processed = process_region.expand(region=regions)
    merge_results(processed)

etl_multi_region()

Pada saat eksekusi, Airflow membuat tiga instance paralel dari process_region — satu per region. Apabila region keempat ditambahkan esok hari, tidak diperlukan perubahan kode DAG sama sekali. Task merge_results menunggu seluruh mapped instance selesai sebelum dieksekusi.

Batas mapped task

Secara default, Airflow membatasi mapped task hingga 1024 instance per mapping. Batas ini dapat diubah melalui parameter max_map_length dalam konfigurasi DAG untuk memproses dataset yang lebih besar.

Asset Partition: Penjadwalan Data-Aware di Airflow 3.2

Sebelum Airflow 3.2, penjadwalan data-aware beroperasi pada level asset secara keseluruhan: ketika DAG producer memperbarui sebuah asset, semua DAG consumer terpicu tanpa memperhatikan bagian data mana yang berubah. Asset partition memperbaiki hal ini dengan memungkinkan granularitas pada level partisi.

Perhatikan skenario berikut: tiga DAG upstream menghasilkan statistik pemain per jam untuk liga olahraga yang berbeda. DAG downstream untuk analitik seharusnya hanya terpicu ketika ketiga liga telah mempublikasikan data untuk jam yang sama.

python
# downstream_analytics.py
import pendulum
from airflow.sdk import dag, task
from airflow.timetables.assets import CronPartitionTimetable

# Define partitioned assets with hourly granularity
nba_stats = Asset("s3://datalake/nba/hourly/", partitions=CronPartitionTimetable("0 * * * *"))
epl_stats = Asset("s3://datalake/epl/hourly/", partitions=CronPartitionTimetable("0 * * * *"))
nfl_stats = Asset("s3://datalake/nfl/hourly/", partitions=CronPartitionTimetable("0 * * * *"))

@dag(
    # Triggers only when all three assets have matching partition
    schedule=(nba_stats & epl_stats & nfl_stats),
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
)
def unified_sports_analytics():
    @task()
    def aggregate_all_leagues(**context):
        partition = context["partition"]
        # Process only the specific hourly slice across all leagues
        from analytics import build_cross_league_report
        build_cross_league_report(partition=partition)

    aggregate_all_leagues()

unified_sports_analytics()

Penjadwalan berbasis partisi mengeliminasi eksekusi pipeline yang redundan. DAG downstream hanya dijalankan ketika partisi spesifik yang dibutuhkan telah tersedia dari seluruh sumber upstream — tidak sebelumnya, dan tidak pada data yang belum lengkap.

Siap menguasai wawancara Data Engineering Anda?

Berlatih dengan simulator interaktif, flashcards, dan tes teknis kami.

Task Async Native untuk Beban Kerja I/O-Heavy

Airflow 3.2 menambahkan dukungan async native pada PythonOperator. Sebelumnya, menjalankan operasi I/O secara bersamaan (ribuan pemanggilan API, pengunduhan file secara batch) memerlukan penulisan custom deferrable operator. Kini, cukup dengan mengoper fungsi async secara langsung.

python
# async_file_download.py
import asyncio
import aiohttp
from airflow.sdk import dag, task
import pendulum

@dag(
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=["async", "download"],
)
def async_file_download():
    @task()
    async def download_files():
        urls = [f"https://data.source/files/{i}.csv" for i in range(500)]
        async with aiohttp.ClientSession() as session:
            # Downloads 500 files concurrently instead of sequentially
            tasks = [fetch_and_save(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
        return {"downloaded": len(results)}

    download_files()

async def fetch_and_save(session: aiohttp.ClientSession, url: str):
    async with session.get(url) as resp:
        content = await resp.read()
        filename = url.split("/")[-1]
        with open(f"/data/downloads/{filename}", "wb") as f:
            f.write(content)
        return filename

async_file_download()

Pendekatan async mengunduh 500 file secara bersamaan melalui satu slot worker, dibandingkan dengan 500 pemanggilan HTTP sekuensial pada versi sinkron. Untuk beban kerja yang didominasi I/O, hal ini menghasilkan peningkatan kecepatan berlipat ganda tanpa perlu menyediakan worker tambahan.

Arsitektur Airflow: Komponen dan Alur Eksekusi

Memahami arsitektur Airflow sangat penting baik untuk mengoperasikannya di lingkungan produksi maupun menjawab pertanyaan wawancara. Lima komponen berinteraksi dalam setiap DAG run:

  • Scheduler — Mem-parsing file DAG, menyelesaikan dependensi, dan mengantrekan task untuk dieksekusi. Di Airflow 3.x, scheduler beroperasi secara stateless terhadap metadata database.
  • Executor — Menentukan di mana task dijalankan. LocalExecutor menangani setup mesin tunggal. CeleryExecutor mendistribusikan task ke node worker. KubernetesExecutor membuat pod baru untuk setiap task.
  • Worker — Mengeksekusi kode task yang sebenarnya. Dengan Task Execution API yang diperkenalkan di Airflow 3.0, worker berkomunikasi melalui kontrak yang stabil, memungkinkan eksekusi di container, lingkungan edge, maupun runtime eksternal.
  • Metadata database — PostgreSQL (direkomendasikan) atau MySQL. Menyimpan definisi DAG, status task, nilai XCom, kredensial koneksi, dan log audit.
  • Web server — Antarmuka UI Airflow untuk memantau DAG run, memeriksa log, memicu eksekusi manual, dan mengelola koneksi.
Pemilihan executor untuk produksi

Hindari SequentialExecutor di lingkungan produksi — executor ini hanya menjalankan satu task pada satu waktu dan hanya ditujukan untuk pengembangan. Untuk lingkungan berbasis Kubernetes, KubernetesExecutor memberikan isolasi terkuat karena setiap task mendapatkan pod-nya sendiri dengan sumber daya dan dependensi yang independen.

Airflow vs. Prefect vs. Dagster di Tahun 2026

Airflow bersaing dengan dua alternatif signifikan. Pilihan yang tepat bergantung pada ukuran tim, infrastruktur yang sudah ada, dan tipe pipeline yang dibangun.

| Feature | Airflow 3.2 | Prefect 3.x | Dagster 1.9 | |---|---|---|---| | DAG definition | Python decorators (airflow.sdk) | Python decorators (@flow, @task) | Python decorators (@asset, @op) | | Scheduling | Cron, asset-aware, partition-aware | Cron, event-driven | Cron, sensor-based, asset-aware | | Execution model | Centralized scheduler + distributed workers | Hybrid (server + work pools) | Centralized dagster-daemon | | Dynamic tasks | .expand() mapped tasks | Native Python loops | Dynamic partitions | | Async support | Native in 3.2 | Native since 2.0 | Async I/O ops | | Multi-team isolation | Built-in (3.2 experimental) | Workspace-based (Cloud) | Branch deployments | | Community size | Largest (35k+ GitHub stars) | Growing (18k+ stars) | Growing (12k+ stars) | | Best fit | Complex, multi-team pipelines with established infrastructure | Smaller teams wanting fast iteration | Data asset-centric organizations |

Keunggulan Airflow terletak pada ekosistemnya: lebih dari 80 provider package yang mencakup setiap layanan cloud utama, database, dan API. Prefect unggul dalam pengalaman developer dengan boilerplate yang lebih sedikit. Model asset-centric milik Dagster cocok untuk tim yang berpikir dalam konteks produk data, bukan urutan task.

Pertanyaan Wawancara: Apache Airflow untuk Data Engineer

Pertanyaan-pertanyaan berikut mencerminkan apa yang ditanyakan oleh hiring manager dan senior engineer selama wawancara data engineering di perusahaan yang menjalankan Airflow di lingkungan produksi.

Apa itu DAG, dan bagaimana Airflow menggunakannya?

DAG (Directed Acyclic Graph) mendefinisikan alur kerja sebagai kumpulan task dengan dependensi, yang dijamin tidak memiliki referensi sirkular. Airflow mem-parsing file Python di folder dags/, membangun graf dependensi, dan scheduler menentukan urutan eksekusi. Setiap DAG run membuat objek DagRun yang terikat pada tanggal logis tertentu. Constraint "acyclic" memastikan scheduler selalu dapat menemukan urutan eksekusi yang valid — task A berjalan sebelum task B, dan tidak pernah sebaliknya secara bersamaan.

Bagaimana cara kerja XCom, dan kapan sebaiknya dihindari?

XCom (cross-communication) memungkinkan pengiriman data berukuran kecil antar task. Sebuah task mendorong nilai return ke XCom; task downstream menariknya. Task SDK menangani hal ini secara otomatis saat mengoper nilai return antar task yang menggunakan dekorator. XCom menyimpan data di metadata database secara default, sehingga tidak cocok untuk dataset berukuran besar (di atas beberapa KB). Untuk mentransfer data berukuran besar, gunakan penyimpanan eksternal (S3, GCS) dan kirimkan hanya path referensi melalui XCom.

Jelaskan perbedaan antara schedule, start_date, dan catchup.

Parameter schedule (yang diubah namanya dari schedule_interval di Airflow 3.x) mendefinisikan seberapa sering DAG berjalan: string cron, timedelta, objek timetable, atau trigger asset. start_date menetapkan tanggal logis paling awal untuk pembuatan DAG run. catchup=True (default) membuat DAG run untuk semua interval yang terlewat antara start_date dan waktu saat ini. Mengatur catchup=False memberi tahu scheduler untuk melewati interval yang sudah berlalu dan hanya menjadwalkan dari waktu sekarang ke depan. Pola umum di produksi: gunakan catchup=False untuk DAG operasional dan catchup=True untuk backfill data historis.

Apa perbedaan KubernetesExecutor dan CeleryExecutor?

CeleryExecutor memelihara pool proses worker yang berjalan terus-menerus, terhubung melalui message broker (Redis atau RabbitMQ). Task diantrekan dan dieksekusi pada worker yang tersedia. KubernetesExecutor membuat pod Kubernetes baru untuk setiap task, menggunakan Docker image dan persyaratan sumber daya yang ditentukan oleh task tersebut. Celery menawarkan latensi lebih rendah (tanpa overhead startup pod) dan bekerja baik untuk beban kerja yang homogen. KubernetesExecutor menyediakan isolasi yang lebih kuat, kontrol sumber daya per task, dan menghilangkan kebutuhan mengelola pool worker statis — ideal untuk beban kerja heterogen di mana task memiliki dependensi dan kebutuhan memori yang berbeda-beda.

Strategi apa yang tersedia untuk menangani kegagalan task?

Airflow menyediakan berbagai mekanisme penanganan kegagalan. retries dan retry_delay mengonfigurasi retry otomatis dengan exponential backoff. on_failure_callback memicu logika kustom (peringatan Slack, insiden PagerDuty) ketika task gagal. trigger_rule mengontrol bagaimana task downstream merespons — all_success (default), one_success, all_failed, atau none_failed_min_one_success. Untuk masalah infrastruktur yang bersifat transien, parameter retry_exponential_backoff=True meningkatkan waktu tunggu antar retry. SLA (kini Deadline Alerts di versi 3.x) memantau durasi eksekusi dan memicu callback ketika task melebihi waktu eksekusi yang diharapkan.

Praktik Terbaik Produksi untuk Deployment Airflow

Menjalankan Airflow secara andal pada skala besar memerlukan perhatian terhadap beberapa pola operasional yang melampaui sekadar penulisan DAG yang benar.

Task yang idempoten. Setiap task harus menghasilkan hasil yang sama ketika dieksekusi berulang kali dengan input yang sama. Gunakan INSERT ... ON CONFLICT atau MERGE sebagai pengganti INSERT biasa. Partisi data output berdasarkan tanggal logis. Pendekatan ini memastikan retry dan backfill yang aman tanpa duplikasi data.

DAG yang kecil dan terfokus. Hindari godaan untuk membangun DAG monolitik dengan puluhan task. Pecah pipeline yang kompleks menjadi beberapa DAG yang terhubung melalui asset (sebelumnya dikenal sebagai dataset). DAG yang lebih kecil lebih cepat di-parse, lebih mudah di-debug, dan memungkinkan restart pipeline secara parsial.

Manajemen koneksi. Simpan seluruh kredensial di connection manager Airflow atau secrets backend eksternal (AWS Secrets Manager, HashiCorp Vault). Jangan pernah menulis kredensial secara langsung di file DAG. Airflow 3.x mengenkripsi field koneksi saat disimpan di metadata database.

Monitoring dan alerting. Ekspor metrik Airflow ke Prometheus atau StatsD. Pantau scheduler_heartbeat, dag_processing.total_parse_time, dan executor.queued_tasks. Airflow 3.2 menambahkan OpenTelemetry traces untuk observabilitas pipeline secara menyeluruh.

Mempersiapkan wawancara data engineering yang mencakup orkestrasi pipeline? SharpSkill menawarkan modul pertanyaan wawancara Airflow dan latihan praktik yang mencerminkan skenario nyata dari lingkungan produksi.

Mulai berlatih!

Uji pengetahuan Anda dengan simulator wawancara dan tes teknis kami.

Kesimpulan

  • Airflow 3.2 memperkenalkan Task SDK (airflow.sdk) sebagai API stabil untuk pembuatan DAG — migrasi import perlu dilakukan sekarang untuk menghindari breaking change di rilis mendatang
  • Asset partition memungkinkan penjadwalan data-aware pada level partisi, mengeliminasi trigger pipeline yang redundan akibat pembaruan data parsial
  • Dukungan async native pada PythonOperator menangani beban kerja I/O-heavy (pengunduhan batch, API fan-out) tanpa memerlukan custom deferrable operator
  • Dynamic task mapping dengan .expand() beradaptasi terhadap ukuran beban kerja yang variabel saat runtime tanpa mengubah kode DAG
  • Persiapan wawancara sebaiknya mencakup mekanisme DAG, trade-off executor (Celery vs. Kubernetes), keterbatasan XCom, dan pola idempotency
  • Deployment produksi mendapat manfaat dari DAG yang kecil dan idempoten, manajemen secret eksternal, dan ekspor metrik ke platform observabilitas

Mulai berlatih!

Uji pengetahuan Anda dengan simulator wawancara dan tes teknis kami.

Tag

#Apache Airflow
#DAG
#Data Pipeline
#Data Engineering
#Python

Bagikan

Artikel terkait