Apache Airflow у 2026 році: оркестрація конвеєрів даних, DAG та питання для співбесіди

Повний посібник з Apache Airflow 3.2: створення DAG за допомогою Task SDK, динамічне мапування задач, партиціоновані ассети, нативна підтримка async та питання для підготовки до співбесіди з data engineering у 2026 році.

Apache Airflow оркестрація конвеєрів даних DAG посібник 2026

Apache Airflow 3.2 є найзначнішою еволюцією open-source платформи оркестрації з моменту фундаментальної архітектурної перебудови у версії 3.0. З понад 10 мільйонами завантажень щомісяця на PyPI та впровадженнями у компаніях від Airbnb до Spotify, Airflow зберігає позицію основного інструмента для побудови, планування та моніторингу конвеєрів даних. Ця стаття охоплює створення DAG за допомогою нового Task SDK, патерни оркестрації конвеєрів даних та питання, які регулярно звучать на технічних співбесідах з data engineering у 2026 році.

Airflow 3.2: головне

Airflow 3.2, випущений у квітні 2026, впроваджує партиціоновані ассети для гранулярного data-aware планування, нативну підтримку асинхронних задач у PythonOperator та механізми мультикомандних розгортань. Усі імпорти DAG тепер використовують стабільний простір імен airflow.sdk, запроваджений в Airflow 3.0.

Створення DAG за допомогою Airflow Task SDK

Airflow 3.0 представив Task SDK — самостійний пакет, який відокремлює визначення DAG від внутрішніх механізмів Airflow. Головна мета полягає у створенні портативних, версійно стабільних DAG, які витримують оновлення Airflow без необхідності змінювати код. Усі ключові об'єкти — DAG, dag, task, BaseOperator, Connection, Variable — відтепер знаходяться у модулі airflow.sdk.

Попередні шляхи імпорту (airflow.decorators.task, airflow.models.dag.DAG) продовжують працювати у версії 3.2, проте позначені як застарілі та будуть вилучені у наступних релізах.

python
# etl_daily_revenue.py
import pendulum
from airflow.sdk import dag, task

@dag(
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=["finance", "etl"],
)
def etl_daily_revenue():
    @task()
    def extract_transactions() -> list[dict]:
        # Pulls raw transactions from the payments API
        import requests
        response = requests.get("https://api.internal/payments/daily")
        return response.json()["transactions"]

    @task(multiple_outputs=True)
    def transform(transactions: list[dict]) -> dict:
        # Aggregates by currency and computes totals
        totals = {}
        for tx in transactions:
            currency = tx["currency"]
            totals[currency] = totals.get(currency, 0) + tx["amount"]
        return {"totals": totals, "count": len(transactions)}

    @task()
    def load(totals: dict, count: int):
        # Inserts aggregated row into the warehouse
        from warehouse import insert_revenue_summary
        insert_revenue_summary(totals=totals, transaction_count=count)

    raw = extract_transactions()
    result = transform(raw)
    load(result["totals"], result["count"])

etl_daily_revenue()

Декоратор @dag замінює класичний контекстний менеджер with DAG(...). Декоратор @task перетворює звичайні функції Python на задачі Airflow, а серіалізація через XCom відбувається автоматично. Виклик transform(raw) оголошує залежність між задачами — Airflow формує ребра графа DAG на основі ланцюжка викликів функцій.

Варто звернути увагу на параметр multiple_outputs=True у задачі transform. Він дозволяє повертати словник, окремі ключі якого стають доступними як незалежні XCom-значення. Це спрощує передачу структурованих результатів до наступних задач без зайвого шаблонного коду.

Динамічне мапування задач для змінних навантажень

Статичні DAG втрачають ефективність, коли кількість елементів для обробки змінюється між запусками. Динамічне мапування задач, стабільне починаючи з Airflow 2.4 та вдосконалене у серії 3.x, вирішує цю проблему шляхом розгортання задач під час виконання за допомогою методу .expand().

python
# etl_multi_region.py
import pendulum
from airflow.sdk import dag, task

@dag(
    schedule="@hourly",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=["regional", "etl"],
)
def etl_multi_region():
    @task()
    def get_regions() -> list[str]:
        # Returns the active regions from config
        return ["us-east-1", "eu-west-1", "ap-southeast-1"]

    @task()
    def process_region(region: str) -> dict:
        # Processes events for a single region
        from data_processor import aggregate_events
        return aggregate_events(region)

    @task()
    def merge_results(results: list[dict]):
        # Combines all regional outputs into a single report
        from reporter import publish_global_report
        publish_global_report(results)

    regions = get_regions()
    # .expand() creates one mapped task instance per region
    processed = process_region.expand(region=regions)
    merge_results(processed)

etl_multi_region()

Під час виконання Airflow створює три паралельні екземпляри process_region — по одному на кожен регіон. Якщо наступного дня з'явиться четвертий регіон, жодних змін у коді DAG не потрібно. Задача merge_results очікує завершення всіх мапованих екземплярів перед початком власного виконання.

Обмеження на кількість мапованих задач

За замовчуванням Airflow обмежує кількість мапованих задач до 1024 екземплярів на одне відображення. Це значення можна перевизначити параметром max_map_length у конфігурації DAG при обробці масштабніших наборів даних.

Партиціоновані ассети в Airflow 3.2

До версії 3.2 data-aware планування функціонувало на рівні цілого ассету: коли DAG-продюсер оновлював ассет, усі DAG-споживачі запускалися незалежно від того, який саме зріз даних зазнав змін. Партиціоновані ассети усувають цю проблему, забезпечуючи гранулярність на рівні окремих партицій.

Для ілюстрації розглянемо сценарій, де три upstream DAG формують щогодинну статистику гравців для різних спортивних ліг. Downstream аналітичний DAG має запускатися виключно тоді, коли всі три ліги опублікували дані за один і той самий часовий інтервал.

python
# downstream_analytics.py
import pendulum
from airflow.sdk import dag, task
from airflow.timetables.assets import CronPartitionTimetable

# Define partitioned assets with hourly granularity
nba_stats = Asset("s3://datalake/nba/hourly/", partitions=CronPartitionTimetable("0 * * * *"))
epl_stats = Asset("s3://datalake/epl/hourly/", partitions=CronPartitionTimetable("0 * * * *"))
nfl_stats = Asset("s3://datalake/nfl/hourly/", partitions=CronPartitionTimetable("0 * * * *"))

@dag(
    # Triggers only when all three assets have matching partition
    schedule=(nba_stats & epl_stats & nfl_stats),
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
)
def unified_sports_analytics():
    @task()
    def aggregate_all_leagues(**context):
        partition = context["partition"]
        # Process only the specific hourly slice across all leagues
        from analytics import build_cross_league_report
        build_cross_league_report(partition=partition)

    aggregate_all_leagues()

unified_sports_analytics()

Оператор & у параметрі schedule гарантує, що DAG запуститься лише тоді, коли всі три ассети матимуть відповідну партицію. Планування на основі партицій усуває надлишкові запуски конвеєрів. Downstream DAG активується виключно за умови наявності потрібної партиції у всіх upstream-джерелах — не раніше й не на частковому наборі даних.

Готовий до співбесід з Data Engineering?

Практикуйся з нашими інтерактивними симуляторами, flashcards та технічними тестами.

Нативні асинхронні задачі

Airflow 3.2 додає нативну підтримку async у PythonOperator. До цього запуск конкурентних I/O-операцій (тисячі API-викликів, масове завантаження файлів) вимагав створення власних deferrable-операторів. Починаючи з версії 3.2, достатньо передати async-функцію безпосередньо у декоратор @task.

python
# async_file_download.py
import asyncio
import aiohttp
from airflow.sdk import dag, task
import pendulum

@dag(
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=["async", "download"],
)
def async_file_download():
    @task()
    async def download_files():
        urls = [f"https://data.source/files/{i}.csv" for i in range(500)]
        async with aiohttp.ClientSession() as session:
            # Downloads 500 files concurrently instead of sequentially
            tasks = [fetch_and_save(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
        return {"downloaded": len(results)}

    download_files()

async def fetch_and_save(session: aiohttp.ClientSession, url: str):
    async with session.get(url) as resp:
        content = await resp.read()
        filename = url.split("/")[-1]
        with open(f"/data/downloads/{filename}", "wb") as f:
            f.write(content)
        return filename

async_file_download()

Асинхронний підхід дозволяє завантажувати 500 файлів конкурентно через один слот воркера замість 500 послідовних HTTP-запитів у синхронній версії. Для I/O-bound навантажень це забезпечує прискорення на порядок величини без необхідності виділяти додаткових воркерів.

Архітектура Airflow: компоненти та потік виконання

Розуміння архітектури Airflow є необхідною умовою як для експлуатації системи у production, так і для успішного проходження технічних співбесід. П'ять компонентів взаємодіють під час кожного запуску DAG:

  • Scheduler — парсить файли DAG, визначає залежності та формує чергу задач для виконання. В Airflow 3.x scheduler працює stateless відносно бази метаданих.
  • Executor — визначає місце виконання задач. LocalExecutor призначений для однонодових конфігурацій. CeleryExecutor розподіляє задачі між воркерами. KubernetesExecutor створює окремий pod для кожної задачі.
  • Workers — виконують безпосередній код задач. Завдяки Task Execution API в Airflow 3.0 воркери комунікують через стабільний контракт, що уможливлює виконання задач у контейнерах, edge-середовищах чи зовнішніх runtime.
  • База метаданих — PostgreSQL (рекомендовано) або MySQL. Зберігає визначення DAG, стани задач, XCom-значення, облікові дані підключень та audit-логи.
  • Веб-сервер — інтерфейс Airflow для моніторингу запусків DAG, перегляду логів, ручного тригерування та керування підключеннями.
Вибір executor для production

Використання SequentialExecutor у production неприпустиме — він виконує лише одну задачу за раз і існує виключно для розробки. У Kubernetes-середовищах KubernetesExecutor забезпечує найсильнішу ізоляцію, оскільки кожна задача отримує власний pod з незалежними ресурсами та залежностями.

Airflow проти Prefect та Dagster у 2026 році

Airflow конкурує з двома вагомими альтернативами. Правильний вибір залежить від розміру команди, наявної інфраструктури та типів конвеєрів, що будуються.

| Характеристика | Airflow 3.2 | Prefect 3.x | Dagster 1.9 | |---|---|---|---| | Визначення DAG | Декоратори Python (airflow.sdk) | Декоратори Python (@flow, @task) | Декоратори Python (@asset, @op) | | Планування | Cron, asset-aware, partition-aware | Cron, event-driven | Cron, sensor-based, asset-aware | | Модель виконання | Централізований scheduler + розподілені воркери | Гібридна (сервер + пули задач) | Централізований dagster-daemon | | Динамічні задачі | .expand() mapped tasks | Нативні цикли Python | Динамічні партиції | | Підтримка async | Нативна з 3.2 | Нативна з 2.0 | Async I/O ops | | Мультикомандна ізоляція | Вбудована (3.2 експериментальна) | На основі workspace (Cloud) | Branch deployments | | Розмір спільноти | Найбільша (35k+ зірок GitHub) | Зростає (18k+ зірок) | Зростає (12k+ зірок) | | Найкраще підходить | Складні мультикомандні конвеєри з усталеною інфраструктурою | Менші команди з фокусом на швидкій ітерації | Організації з орієнтацією на data-продукти |

Перевага Airflow криється в екосистемі: понад 80 пакетів провайдерів, що охоплюють кожен значний хмарний сервіс, базу даних та API. Prefect відзначається кращим досвідом розробника завдяки меншій кількості шаблонного коду. Asset-орієнтована модель Dagster оптимальна для команд, які мислять категоріями продуктів даних, а не послідовностями задач.

Питання для співбесіди: Apache Airflow для data-інженерів

Наведені нижче питання відображають те, що менеджери з найму та senior-інженери справді запитують під час технічних співбесід з data engineering у компаніях, де Airflow працює у production.

Що таке DAG і яку роль він відіграє в Airflow?

DAG (Directed Acyclic Graph, спрямований ациклічний граф) описує робочий процес як сукупність задач із залежностями, гарантуючи відсутність циклічних посилань. Airflow аналізує Python-файли з каталогу dags/, будує граф залежностей, після чого scheduler визначає порядок виконання. Кожен запуск DAG створює об'єкт DagRun, прив'язаний до логічної дати. Обмеження ациклічності забезпечує, що scheduler завжди здатен знайти коректний порядок виконання — задача A виконується перед задачею B, але ніколи навпаки одночасно.

Як функціонує XCom і коли його варто уникати?

XCom (cross-communication) забезпечує передачу невеликих фрагментів даних між задачами. Задача надсилає повернене значення до XCom, а downstream-задачі отримують його. Task SDK обробляє цей механізм автоматично при передачі повернених значень між декорованими задачами. XCom за замовчуванням зберігає дані у базі метаданих, що робить його непридатним для великих обсягів (понад кілька КБ). Для передачі масивних даних доцільно використовувати зовнішнє сховище (S3, GCS) і передавати через XCom виключно шлях-посилання.

Яка різниця між schedule, start_date та catchup?

Параметр schedule (перейменований з schedule_interval в Airflow 3.x) визначає періодичність запуску DAG: cron-вираз, timedelta, об'єкт timetable або тригер на основі ассетів. start_date встановлює найранішу логічну дату, для якої може бути створено запуск DAG. catchup=True (значення за замовчуванням) генерує запуски DAG для всіх пропущених інтервалів між start_date та поточним моментом. catchup=False інструктує scheduler пропустити минулі інтервали та планувати лише від поточного часу. Поширений production-патерн: catchup=False для операційних DAG та catchup=True для історичних backfill.

Чим KubernetesExecutor відрізняється від CeleryExecutor?

CeleryExecutor підтримує пул довготривалих воркер-процесів, з'єднаних через брокер повідомлень (Redis або RabbitMQ). Задачі стають у чергу та виконуються на доступних воркерах. KubernetesExecutor створює новий pod Kubernetes для кожної задачі, використовуючи Docker-образ та вимоги до ресурсів конкретної задачі. Celery пропонує нижчу затримку (відсутній overhead на запуск pod) і добре працює для однорідних навантажень. KubernetesExecutor гарантує сильнішу ізоляцію, контроль ресурсів на рівні окремої задачі та усуває потребу в керуванні статичним пулом воркерів — ідеальне рішення для гетерогенних навантажень з різними вимогами до залежностей та пам'яті.

Які стратегії обробки збоїв задач пропонує Airflow?

Airflow надає низку механізмів обробки збоїв. retries та retry_delay налаштовують автоматичні повторні спроби з експоненціальним відступом. on_failure_callback ініціює виконання довільної логіки (Slack-сповіщення, PagerDuty-інциденти) при збої задачі. trigger_rule контролює поведінку downstream-задач — all_success (за замовчуванням), one_success, all_failed або none_failed_min_one_success. Для транзієнтних інфраструктурних проблем параметр retry_exponential_backoff=True збільшує інтервал між повторними спробами. Механізм SLA (перейменований на Deadline Alerts у 3.x) відстежує тривалість виконання та ініціює callback, коли задачі перевищують очікуваний час роботи.

Найкращі практики для production-розгортань

Надійна робота Airflow у масштабі потребує дотримання операційних патернів, що виходять за рамки коректного написання DAG.

Ідемпотентні задачі. Кожна задача має давати ідентичний результат при багаторазовому виконанні з тими самими вхідними даними. Замість простого INSERT слід використовувати INSERT ... ON CONFLICT або MERGE. Вихідні дані партиціонуються за логічною датою. Це гарантує безпечність повторних спроб та backfill без дублювання даних.

Компактні, сфокусовані DAG. Побудови монолітних DAG з десятками задач слід уникати. Складні конвеєри розбиваються на декілька DAG, з'єднаних через ассети (раніше datasets). Менші DAG швидше парсяться, простіші у дебагінгу та допускають часткові перезапуски конвеєрів.

Керування підключеннями. Усі облікові дані зберігаються у менеджері підключень Airflow або зовнішньому secrets backend (AWS Secrets Manager, HashiCorp Vault). Хардкодити облікові дані у файлах DAG категорично не рекомендується. Airflow 3.x шифрує поля підключень at rest у базі метаданих.

Моніторинг та алертинг. Метрики Airflow слід експортувати до Prometheus або StatsD. Критичні метрики для відстеження: scheduler_heartbeat, dag_processing.total_parse_time та executor.queued_tasks. Airflow 3.2 додає OpenTelemetry traces для наскрізної спостережуваності конвеєрів.

Підготовка до співбесіди з data engineering, що охоплює оркестрацію конвеєрів даних? SharpSkill пропонує спеціалізовані питання для співбесіди з Airflow та практичні модулі, що відтворюють реальні сценарії з production-середовищ.

Починай практикувати!

Перевір свої знання з нашими симуляторами співбесід та технічними тестами.

Висновок

  • Airflow 3.2 запроваджує Task SDK (airflow.sdk) як стабільний API для створення DAG — міграцію імпортів варто виконати зараз, щоб уникнути breaking changes у майбутніх релізах
  • Партиціоновані ассети забезпечують data-aware планування на рівні партицій, усуваючи надлишкові запуски конвеєрів при частковому оновленні даних
  • Нативна підтримка async у PythonOperator обробляє I/O-інтенсивні навантаження (масові завантаження, API fan-out) без потреби у створенні власних deferrable-операторів
  • Динамічне мапування задач через .expand() адаптується до змінних обсягів навантажень під час виконання без модифікації коду DAG
  • Підготовка до співбесід має охоплювати механіку DAG, компроміси між executor-ами (Celery проти Kubernetes), обмеження XCom та патерни ідемпотентності
  • Production-розгортання виграють від компактних ідемпотентних DAG, зовнішнього керування секретами та експорту метрик до платформ спостережуваності

Починай практикувати!

Перевір свої знання з нашими симуляторами співбесід та технічними тестами.

Теги

#airflow
#data-engineering
#python
#pipeline-orchestration
#interview

Поділитися

Пов'язані статті