Apache Airflow 2026年版:パイプラインオーケストレーション、DAGと面接対策ガイド

Apache Airflow 3.2のTask SDKによるDAG構築、アセットパーティション、ネイティブ非同期タスク、データエンジニア面接でよく問われる質問を網羅的に解説するチュートリアルです。

Apache Airflow 2026年版:パイプラインオーケストレーション、DAGと面接対策ガイド

Apache Airflow 3.2は、3.0のアーキテクチャ刷新以降、最も重要な進化を遂げたリリースである。PyPIでの月間ダウンロード数は1,000万を超え、AirbnbからSpotifyまで幅広い企業で採用されており、データパイプラインの構築・スケジューリング・監視における主要ツールとしての地位を確立している。本チュートリアルでは、新しいTask SDKを用いたDAG作成、パイプラインオーケストレーションのパターン、そしてデータエンジニア職の面接で頻出する質問を取り上げる。

Airflow 3.2の主要ポイント

Airflow 3.2は2026年4月にリリースされ、きめ細かなデータ駆動スケジューリングを実現するアセットパーティション、PythonOperatorでのネイティブ非同期サポート、マルチチームデプロイメントが導入されました。すべてのDAGインポートは、Airflow 3.0で導入された安定したairflow.sdk名前空間を使用します。

Airflow Task SDKによるDAG作成

Airflow 3.0で導入されたTask SDKは、DAG定義をAirflowの内部実装から分離する独立パッケージである。その目的は、Airflowのアップグレードを経てもコード変更なしに動作する、ポータブルで安定したDAGを記述することにある。DAGdagtaskBaseOperatorConnectionVariableなどのコアオブジェクトはすべてairflow.sdkに配置されている。

レガシーのインポートパス(airflow.decorators.taskairflow.models.dag.DAG)は3.2でも動作するが、非推奨としてマークされており、将来のリリースで削除される予定である。

python
# etl_daily_revenue.py
import pendulum
from airflow.sdk import dag, task

@dag(
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=["finance", "etl"],
)
def etl_daily_revenue():
    @task()
    def extract_transactions() -> list[dict]:
        # Pulls raw transactions from the payments API
        import requests
        response = requests.get("https://api.internal/payments/daily")
        return response.json()["transactions"]

    @task(multiple_outputs=True)
    def transform(transactions: list[dict]) -> dict:
        # Aggregates by currency and computes totals
        totals = {}
        for tx in transactions:
            currency = tx["currency"]
            totals[currency] = totals.get(currency, 0) + tx["amount"]
        return {"totals": totals, "count": len(transactions)}

    @task()
    def load(totals: dict, count: int):
        # Inserts aggregated row into the warehouse
        from warehouse import insert_revenue_summary
        insert_revenue_summary(totals=totals, transaction_count=count)

    raw = extract_transactions()
    result = transform(raw)
    load(result["totals"], result["count"])

etl_daily_revenue()

@dagデコレータは、従来のwith DAG(...)コンテキストマネージャを置き換えるものである。@taskデコレータは通常のPython関数をAirflowタスクに変換し、XComのシリアライゼーションは自動的に処理される。transform(raw)を呼び出すことで依存関係が宣言され、Airflowは関数呼び出しグラフからDAGのエッジを構築する。

可変ワークロードに対応するダイナミックタスクマッピング

処理対象の項目数が実行ごとに変わる場合、静的なDAGでは対応できない。Airflow 2.4で安定版となり、3.xでさらに改良されたダイナミックタスクマッピングは、.expand()を使用して実行時にタスクを展開することでこの問題を解決する。

python
# etl_multi_region.py
import pendulum
from airflow.sdk import dag, task

@dag(
    schedule="@hourly",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=["regional", "etl"],
)
def etl_multi_region():
    @task()
    def get_regions() -> list[str]:
        # Returns the active regions from config
        return ["us-east-1", "eu-west-1", "ap-southeast-1"]

    @task()
    def process_region(region: str) -> dict:
        # Processes events for a single region
        from data_processor import aggregate_events
        return aggregate_events(region)

    @task()
    def merge_results(results: list[dict]):
        # Combines all regional outputs into a single report
        from reporter import publish_global_report
        publish_global_report(results)

    regions = get_regions()
    # .expand() creates one mapped task instance per region
    processed = process_region.expand(region=regions)
    merge_results(processed)

etl_multi_region()

実行時にAirflowはprocess_regionの並列インスタンスをリージョンごとに3つ作成する。翌日に4番目のリージョンが追加されても、DAGのコード変更は不要である。merge_resultsタスクは、すべてのマップされたインスタンスが完了するまで実行を待機する。

マップされたタスクの上限

デフォルトでは、Airflowはマッピングごとに1024インスタンスまでに制限されています。より大規模なデータセットを処理する場合は、DAG設定のmax_map_lengthでこの値をオーバーライドしてください。

アセットパーティション:Airflow 3.2のデータ駆動スケジューリング

Airflow 3.2以前は、データ駆動スケジューリングはアセットレベルで動作しており、プロデューサーDAGがアセットを更新すると、どのデータスライスが変更されたかに関係なく、すべてのコンシューマーDAGがトリガーされていた。アセットパーティションは、パーティションレベルの粒度を実現することでこの問題を解決する。

3つの上流DAGが異なるスポーツリーグの時間別選手統計を生成するシナリオを考えてみよう。下流の分析DAGは、3つのリーグすべてが同じ時間帯のデータを公開した場合にのみトリガーされるべきである。

python
# downstream_analytics.py
import pendulum
from airflow.sdk import dag, task
from airflow.timetables.assets import CronPartitionTimetable

# Define partitioned assets with hourly granularity
nba_stats = Asset("s3://datalake/nba/hourly/", partitions=CronPartitionTimetable("0 * * * *"))
epl_stats = Asset("s3://datalake/epl/hourly/", partitions=CronPartitionTimetable("0 * * * *"))
nfl_stats = Asset("s3://datalake/nfl/hourly/", partitions=CronPartitionTimetable("0 * * * *"))

@dag(
    # Triggers only when all three assets have matching partition
    schedule=(nba_stats & epl_stats & nfl_stats),
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
)
def unified_sports_analytics():
    @task()
    def aggregate_all_leagues(**context):
        partition = context["partition"]
        # Process only the specific hourly slice across all leagues
        from analytics import build_cross_league_report
        build_cross_league_report(partition=partition)

    aggregate_all_leagues()

unified_sports_analytics()

パーティション化されたスケジューリングにより、冗長なパイプライン実行が排除される。下流DAGは、すべての上流ソースで必要なパーティションが揃った場合にのみ起動し、部分的なデータでは起動しない。

Data Engineeringの面接対策はできていますか?

インタラクティブなシミュレーター、flashcards、技術テストで練習しましょう。

I/O集約型ワークロード向けネイティブ非同期タスク

Airflow 3.2では、PythonOperatorにネイティブ非同期サポートが追加された。以前は、大量のAPI呼び出しやバッチファイルダウンロードなどの並行I/O操作を実行するには、カスタムのdeferrableオペレータを作成する必要があった。現在は、async関数を直接渡すことができる。

python
# async_file_download.py
import asyncio
import aiohttp
from airflow.sdk import dag, task
import pendulum

@dag(
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=["async", "download"],
)
def async_file_download():
    @task()
    async def download_files():
        urls = [f"https://data.source/files/{i}.csv" for i in range(500)]
        async with aiohttp.ClientSession() as session:
            # Downloads 500 files concurrently instead of sequentially
            tasks = [fetch_and_save(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
        return {"downloaded": len(results)}

    download_files()

async def fetch_and_save(session: aiohttp.ClientSession, url: str):
    async with session.get(url) as resp:
        content = await resp.read()
        filename = url.split("/")[-1]
        with open(f"/data/downloads/{filename}", "wb") as f:
            f.write(content)
        return filename

async_file_download()

非同期アプローチでは、単一のワーカースロットで500ファイルを並行してダウンロードできる。同期バージョンでの500回の逐次HTTPリクエストと比較して、I/Oバウンドのワークロードでは桁違いの高速化が実現され、追加のワーカーをプロビジョニングする必要がない。

Airflowアーキテクチャ:コンポーネントと実行フロー

Airflowのアーキテクチャを理解することは、本番環境での運用と面接の両方において不可欠である。すべてのDAG実行において、5つのコンポーネントが相互作用する。

  • スケジューラ - DAGファイルを解析し、依存関係を解決し、実行のためにタスクをキューイングする。Airflow 3.xでは、スケジューラはメタデータベースに対してステートレスに動作する。
  • エグゼキュータ - タスクの実行場所を決定する。LocalExecutorは単一マシンでの処理に対応する。CeleryExecutorはワーカーノード間で分散させる。KubernetesExecutorはタスクごとにPodを起動する。
  • ワーカー - 実際のタスクコードを実行する。Airflow 3.0のTask Execution APIにより、ワーカーは安定したコントラクトを通じて通信し、コンテナ、エッジ環境、外部ランタイムでの実行を可能にする。
  • メタデータベース - PostgreSQL(推奨)またはMySQL。DAG定義、タスク状態、XCom値、接続情報、監査ログを保存する。
  • Webサーバー - DAG実行の監視、ログの検査、手動実行のトリガー、接続管理を行うAirflow UIである。
本番環境でのエグゼキュータ選択

本番環境ではSequentialExecutorの使用を避けてください。一度に1つのタスクしか実行できず、開発用途としてのみ存在します。Kubernetesネイティブな環境では、KubernetesExecutorが最も強力な分離を提供します。各タスクが独自のリソースと依存関係を持つ個別のPodで実行されるためです。

2026年のAirflow vs. Prefect vs. Dagster比較

Airflowには2つの有力な代替ツールが存在する。適切な選択は、チームの規模、既存のインフラストラクチャ、構築するパイプラインの種類によって異なる。

| 機能 | Airflow 3.2 | Prefect 3.x | Dagster 1.9 | |---|---|---|---| | DAG定義 | Pythonデコレータ(airflow.sdk) | Pythonデコレータ(@flow@task) | Pythonデコレータ(@asset@op) | | スケジューリング | Cron、アセット駆動、パーティション対応 | Cron、イベント駆動 | Cron、センサーベース、アセット駆動 | | 実行モデル | 集中スケジューラ+分散ワーカー | ハイブリッド(サーバー+ワークプール) | 集中dagster-daemon | | 動的タスク | .expand()マップドタスク | ネイティブPythonループ | ダイナミックパーティション | | 非同期サポート | 3.2でネイティブ対応 | 2.0からネイティブ対応 | 非同期I/Oオペレーション | | マルチチーム分離 | 内蔵(3.2実験的機能) | ワークスペースベース(Cloud) | ブランチデプロイメント | | コミュニティ規模 | 最大(GitHub 35,000スター以上) | 成長中(18,000スター以上) | 成長中(12,000スター以上) | | 最適な用途 | 大規模で複雑なマルチチームパイプライン | 素早い開発サイクルを求める小規模チーム | データアセット中心の組織 |

Airflowの強みはそのエコシステムにある。主要なクラウドサービス、データベース、APIをカバーする80以上のプロバイダーパッケージが利用可能である。Prefectはボイラープレートの少ない優れた開発者体験に強みがある。Dagsterのアセット中心モデルは、タスクシーケンスではなくデータプロダクトの観点で思考するチームに適している。

データエンジニア面接で問われるApache Airflowの質問

以下の質問は、Airflowを本番環境で運用している企業のデータエンジニア面接で実際に問われる内容を反映している。

DAGとは何か、Airflowはどのように使用するか

DAG(有向非巡回グラフ)は、循環参照がないことが保証されたタスクとその依存関係の集合としてワークフローを定義する。Airflowはdags/フォルダ内のPythonファイルを解析して依存関係グラフを構築し、スケジューラが実行順序を決定する。各DAG実行は論理日付に紐づくDagRunオブジェクトを作成する。「非巡回」の制約により、スケジューラは常に有効な実行順序を見つけることができる。

XComの仕組みと、使用を避けるべき場面は何か

XCom(クロスコミュニケーション)は、タスク間で少量のデータを受け渡すための仕組みである。タスクが戻り値をXComにプッシュし、下流タスクがそれをプルする。Task SDKでは、デコレートされたタスク間で関数の戻り値を渡す際にこれが自動的に処理される。XComはデフォルトでメタデータベースにデータを格納するため、大容量データ(数KB以上)の転送には不向きである。大きなデータを転送する場合は、外部ストレージ(S3、GCS)を使用し、XComには参照パスのみを渡すべきである。

schedulestart_datecatchupの違いについて説明せよ

scheduleパラメータ(Airflow 3.xでschedule_intervalから名称変更)は、DAGの実行頻度を定義する。cron文字列、timedelta、タイムテーブルオブジェクト、またはアセットトリガーを指定できる。start_dateは、DAG実行を作成できる最も早い論理日付を設定する。catchup=True(デフォルト)は、start_dateから現在までの未実行間隔すべてについてDAG実行を作成する。catchup=Falseを設定すると、過去の間隔をスキップし、現在時刻からのみスケジューリングを行う。本番での一般的なパターンとして、運用DAGにはcatchup=Falseを、履歴バックフィルにはcatchup=Trueを設定する。

KubernetesExecutorとCeleryExecutorの違いは何か

CeleryExecutorは、メッセージブローカー(RedisまたはRabbitMQ)を介して接続された長時間稼働のワーカープロセスプールを維持する。タスクはキューに入り、利用可能なワーカーで実行される。KubernetesExecutorは、タスクごとにDockerイメージとリソース要件を使用して新しいKubernetes Podを作成する。Celeryは低レイテンシ(Pod起動のオーバーヘッドがない)を提供し、均質なワークロードに適している。KubernetesExecutorはより強力な分離、タスクごとのリソース制御を提供し、静的なワーカープールの管理が不要なため、異種のワークロードに最適である。

タスク障害の処理にはどのような戦略があるか

Airflowは複数の障害処理メカニズムを提供する。retriesretry_delayで指数バックオフ付きの自動リトライを設定できる。on_failure_callbackはタスク失敗時にカスタムロジック(Slackアラート、PagerDutyインシデント)をトリガーする。trigger_ruleで下流タスクの応答方法を制御する。all_success(デフォルト)、one_successall_failednone_failed_min_one_successが利用可能である。一時的なインフラ障害に対しては、retry_exponential_backoff=Trueパラメータでリトライ間の待機時間を増加させる。SLA(3.xではDeadline Alerts)は実行時間を監視し、タスクが予想される実行時間を超えた場合にコールバックを発火する。

Airflow本番デプロイメントのベストプラクティス

Airflowを大規模に安定稼働させるには、正しいDAGを書くこと以上に、いくつかの運用パターンへの注意が必要である。

冪等なタスク。 すべてのタスクは、同じ入力で複数回実行しても同じ結果を生成するべきである。単純なINSERTの代わりにINSERT ... ON CONFLICTMERGEを使用する。出力データを論理日付でパーティショニングする。これにより、データの重複なしに安全なリトライとバックフィルが可能になる。

小さく集中したDAG。 数十のタスクを持つモノリシックなDAGの構築は避けるべきである。複雑なパイプラインは、アセット(旧datasets)で接続された複数のDAGに分割する。小さなDAGはパースが速く、デバッグが容易で、部分的なパイプラインの再起動が可能である。

接続管理。 すべての認証情報をAirflowの接続マネージャまたは外部シークレットバックエンド(AWS Secrets Manager、HashiCorp Vault)に保存する。DAGファイルに認証情報をハードコードしてはならない。Airflow 3.xはメタデータベース内の接続フィールドを保存時に暗号化する。

監視とアラート。 AirflowメトリクスをPrometheusまたはStatsDにエクスポートする。scheduler_heartbeatdag_processing.total_parse_timeexecutor.queued_tasksを監視する。Airflow 3.2ではOpenTelemetryトレースが追加され、パイプラインのエンドツーエンドの可観測性が実現されている。

パイプラインオーケストレーションを含むデータエンジニア面接の準備をしている方は、SharpSkillのAirflow面接対策モジュールをご活用いただきたい。本番環境の実際のシナリオに基づいた練習問題が用意されている。

今すぐ練習を始めましょう!

面接シミュレーターと技術テストで知識をテストしましょう。

まとめ

  • Airflow 3.2ではTask SDK(airflow.sdk)がDAG作成の安定APIとして導入された。将来の破壊的変更を避けるため、インポートの移行を推奨する
  • アセットパーティションにより、パーティションレベルのデータ駆動スケジューリングが可能になり、部分的なデータ更新による冗長なパイプライントリガーが排除される
  • PythonOperatorのネイティブ非同期サポートにより、カスタムのdeferrableオペレータなしでI/O集約型ワークロードに対応できる
  • .expand()によるダイナミックタスクマッピングは、DAGコードの変更なしに可変のワークロードサイズに実行時に適応する
  • 面接準備では、DAGの仕組み、エグゼキュータのトレードオフ(Celery vs. Kubernetes)、XComの制約、冪等性パターンをカバーすべきである
  • 本番デプロイメントでは、小さく冪等なDAG、外部シークレット管理、可観測性プラットフォームへのメトリクスエクスポートが有効である

今すぐ練習を始めましょう!

面接シミュレーターと技術テストで知識をテストしましょう。

共有

関連記事