DjangoとCelery:非同期タスク処理と面接対策ガイド 2026

DjangoとCeleryを使った非同期タスク処理の実装方法を解説。タスクルーティング、Celery Beatによるスケジューリング、本番環境設定、2026年最新の技術面接質問も網羅。

DjangoとCeleryによる非同期タスク処理

DjangoとCeleryは、Pythonウェブアプリケーションにおける非同期タスク処理の中核を担っています。2026年にリリースされたCelery 5.6とDjango 6.0では、それぞれ重要なアップデートが行われており、分散タスクキューの理解はバックエンドエンジニアの面接においても、実際の運用環境においても不可欠なスキルです。

重要ポイント

Celeryは、時間のかかる処理をリクエスト・レスポンスサイクルから切り離します。Djangoのビューがメッセージブローカー(RedisまたはRabbitMQ)にタスクを送信し、別のワーカープロセスがそれを非同期で実行します。このパターンは、メール送信、レポート生成、画像処理など、ユーザーをブロックするあらゆるワークロードに適用できます。

CeleryとDjangoの統合アーキテクチャ

CeleryはDjangoプロジェクトのコードベースを共有しながら、独立したプロセスとして動作します。この統合は、Djangoアプリケーション(プロデューサー)、メッセージブローカー(RedisまたはRabbitMQ)、1つ以上のCeleryワーカー(コンシューマー)という3つのコンポーネントで構成されています。Djangoのビューで.delay().apply_async()を呼び出すと、タスクはシリアライズされてブローカーキューに送信されます。ワーカーがそのタスクを取得し、自身のプロセス内で実行します。

セットアップは、Djangoプロジェクトルートのcelery.pyモジュールから始まります。

python
# myproject/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

autodiscover_tasks()はインストール済みのすべてのDjangoアプリからtasks.pyモジュールを自動検出し、デコレートされた関数をCeleryタスクとして登録します。

settings.pyでブローカーとリザルトバックエンドを設定します。

python
# myproject/settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'

多くのDjangoデプロイメントでは、Redisがブローカーとリザルトバックエンドの両方を兼務しています。クォーラムキューや高度なルーティングが必要なアプリケーションでは、RabbitMQがより適切な選択肢となります。この機能はCelery 5.5以降で完全にサポートされています。

Celeryタスクの作成とディスパッチ

Celeryタスクは、@shared_taskデコレーターを付けた通常のPython関数です。shared_taskデコレーターはCeleryアプリインスタンスをハードコードしないため、複数のDjangoアプリ間でタスクを再利用できます。

python
# orders/tasks.py
from celery import shared_task
from django.core.mail import send_mail
from orders.models import Order

@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def send_order_confirmation(self, order_id: int) -> str:
    """Send confirmation email for a completed order."""
    try:
        order = Order.objects.select_related('user').get(id=order_id)
        send_mail(
            subject=f'Order #{order.id} Confirmed',
            message=f'Your order totaling {order.total} has been confirmed.',
            from_email='noreply@example.com',
            recipient_list=[order.user.email],
        )
        return f'Email sent for order {order_id}'
    except Order.DoesNotExist:
        return f'Order {order_id} not found'
    except Exception as exc:
        # Retry on transient failures (SMTP timeout, etc.)
        raise self.retry(exc=exc)

このコードには3つの重要なパターンがあります。第一に、bind=Trueによりタスクがselfにアクセスでき、リトライが可能になります。第二に、関数はORMオブジェクトではなくorder_idという整数値を受け取ります。モデルインスタンスはプロセス間の境界を超えて確実にシリアライズできないためです。第三に、self.retry(exc=exc)は一時的な障害発生時にタスクを指数バックオフで再キューイングします。

Djangoビューからのディスパッチ方法は以下の通りです。

python
# orders/views.py
from orders.tasks import send_order_confirmation

def complete_order(request, order_id):
    # ... process payment, update order status ...
    send_order_confirmation.delay(order_id)
    return redirect('order_success', order_id=order_id)

.delay()の呼び出しは即座にリターンします。メールタスクがバックグラウンドで実行されている間、ユーザーには成功ページが表示されます。

タスクルーティングとキュー管理

本番環境では、すべてのタスクを単一のデフォルトキューに投入すべきではありません。処理の遅いレポート生成タスクが、同じワーカープールを共有する高速な通知タスクのリソースを奪ってしまう可能性があるためです。Celeryはタスクルーティングでこの問題を解決します。

python
# myproject/settings.py
CELERY_TASK_ROUTES = {
    'orders.tasks.send_order_confirmation': {'queue': 'notifications'},
    'reports.tasks.generate_monthly_report': {'queue': 'reports'},
    'images.tasks.resize_upload': {'queue': 'media'},
}

ワーカーはキューごとに起動します。

bash
# 通知ワーカー(高速タスク、高並行性)
celery -A myproject worker -Q notifications -c 8 --loglevel=info

# レポートワーカー(低速タスク、並行性制限)
celery -A myproject worker -Q reports -c 2 --loglevel=info

# メディアワーカー(CPU集約型、preforkプール)
celery -A myproject worker -Q media -c 4 -P prefork --loglevel=info

このように分離することで、リソースの競合を防ぎます。通知ワーカーは8つの並行スレッドで高スループットを処理し、レポートワーカーは大規模データセットのクエリによるメモリ枯渇を避けるため2タスクのみ並行実行します。

Djangoの面接対策はできていますか?

インタラクティブなシミュレーター、flashcards、技術テストで練習しましょう。

Celery Beatによる定期タスク

Celery Beatは定期的なタスク実行のための組み込みスケジューラーです。設定された間隔でブローカーにタスクを送信する独立したプロセスとして動作します。

python
# myproject/settings.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'cleanup-expired-sessions': {
        'task': 'accounts.tasks.cleanup_expired_sessions',
        'schedule': crontab(hour=3, minute=0),  # 毎日UTC 3:00
    },
    'sync-inventory': {
        'task': 'inventory.tasks.sync_external_inventory',
        'schedule': 300.0,  # 5分ごと
    },
    'generate-weekly-digest': {
        'task': 'notifications.tasks.send_weekly_digest',
        'schedule': crontab(hour=9, minute=0, day_of_week='monday'),
    },
}

Beatはワーカーと並行して起動します。

bash
celery -A myproject beat --loglevel=info

本番環境でよくある落とし穴として、複数のBeatインスタンスを実行するとタスクが重複実行されます。デプロイメントごとにBeatプロセスは1つだけ稼働させるべきです。django-celery-beatを使用すると、スケジュールをデータベースに保存し、Django管理画面から実行時に変更できるようになります。

FlowerによるモニタリングとObservability

Flowerは、Celeryワーカー、タスク、キューをリアルタイムで監視するWebダッシュボードです。タスクの成功率、実行時間、キューの深さ、ワーカーのステータスを可視化できます。

bash
# Flowerのインストールと実行
pip install flower
celery -A myproject flower --port=5555

本番環境では、Flowerのメトリクスをアラートシステムに連携すべきです。キューの深さが増加し続ける場合はワーカーのキャパシティ不足を示し、特定タスクのリトライ率が高い場合は外部サービスの不安定さを示唆します。

Celery 5.6では構造化ログの改善も導入され、DatadogやELKスタックなどのJSONログアグリゲーターとの連携が強化されています。リリースノートによると、デバッグ効率が30%向上しています。

Django 6.0の組み込みタスク vs. Celery

Django 6.0ではネイティブのバックグラウンドタスクフレームワークが実装され、Celeryがまだ必要かどうかという議論が生まれています。答えはユースケースによって異なります。

Djangoの組み込みタスクフレームワークは、メール送信、キャッシュクリア、軽量なデータ処理など、シンプルなバックグラウンド操作を別途ブローカーインフラストラクチャを必要とせずに処理できます。

Celeryが不可欠なケースは以下の通りです。

  • 分散処理 — 複数マシンにまたがる処理
  • 優先キュー とタスクルーティング
  • レート制限 と高度なリトライポリシー
  • 定期スケジューリング (Celery Beat)
  • 結果追跡 — 設定可能なバックエンド
  • Canvasワークフロー (チェーン、グループ、コード)による複雑なタスクオーケストレーション

ファイアアンドフォーゲット型のバックグラウンド処理だけが必要な場合、Django 6.0の組み込みソリューションで運用の複雑さを軽減できます。分散ワーカー、スケジューリング、タスク合成が必要な場合は、Celery 5.6が実績のある選択肢です。

面接で頻出する質問:DjangoとCelery

バックエンドエンジニアの技術面接では、Celeryに関する知識が頻繁にテストされます。以下は、スタートアップからFAANGまで、実際のDjango面接シナリオから抽出した質問です。

Q: タスクにモデルインスタンスではなくプライマリキーを渡すべき理由は?

Djangoのモデルインスタンスは、データベースコネクション、クエリセット、遅延ロードされたリレーションを保持しており、これらはシリアライズに耐えることができません。Orderオブジェクトではなくorder_idを渡すことで、タスクがデータベースから最新のデータを取得し、陳腐化したステートやシリアライズエラーを回避できます。

Q: self.retry()とタスクの手動再ディスパッチの違いは?

self.retry()はリトライカウントを保持し、設定されたmax_retriesの制限を適用し、デフォルトで指数バックオフを使用します。手動で.delay()を再度呼び出すと、リトライカウンターがリセットされた新しいタスクが生成され、永続的な障害に対して無限リトライループに陥る可能性があります。

Q: Celeryワーカーがタスク実行中にクラッシュした場合、何が起こるか?

動作はacks_lateの設定に依存します。acks_late=Trueの場合、アクノリッジメントが送信されていないため、ブローカーはメッセージを別のワーカーに再配信します。デフォルトのacks_late=Falseでは、実行前にメッセージがアクノリッジされるため、クラッシュするとタスクは失われます。重要なワークロードを扱う本番システムでは、acks_late=Trueと冪等なタスク設計を組み合わせるべきです。

Q: .delay().apply_async()、タスクの直接呼び出しの違いを説明してください。

.delay(*args).apply_async(args=args)のシンタックスシュガーです。.apply_async()countdownetaqueuepriorityexpiresなどの追加オプションを受け付けます。タスク関数を直接呼び出す(.delay()なし)と、現在のプロセス内で同期的に実行されます。テストには便利ですが、非同期処理の意味がなくなります。

Q: キャッシュレイヤーはCeleryタスクの結果とどう連携するか?

Celeryのリザルトバックエンドに保存されたタスク結果は、Djangoのキャッシュフレームワークを使ってキャッシュできます。ビューはまずキャッシュを確認し、存在しない場合はタスクをディスパッチしてAsyncResultのIDを保存します。後続のリクエストはキャッシュされたIDを介してリザルトバックエンドをポーリングし、タスク完了後に最終結果をキャッシュします。

本番デプロイメントチェックリスト

Djangoと並行してCeleryをデプロイする際には、面接でも問われるいくつかの運用上の懸念事項に注意が必要です。

python
# myproject/settings.py — 本番環境設定
CELERY_TASK_ALWAYS_EAGER = False  # 本番では絶対にTrueにしない
CELERY_TASK_ACKS_LATE = True  # ワーカークラッシュ時の再配信
CELERY_WORKER_PREFETCH_MULTIPLIER = 1  # フェアスケジューリング
CELERY_TASK_REJECT_ON_WORKER_LOST = True  # 予期しない終了時に拒否
CELERY_TASK_TIME_LIMIT = 300  # 5分後にハードキル
CELERY_TASK_SOFT_TIME_LIMIT = 240  # 4分後にSoftTimeLimitExceeded
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000  # メモリリーク防止
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True

WORKER_MAX_TASKS_PER_CHILD設定は1000タスクごとにワーカープロセスを再起動し、メモリリークを防止します。Celery 5.6ではこの問題がメモリリークパッチでさらに広範に対処されています。

systemdまたはsupervisorによるプロセス管理は、障害発生時のワーカー自動再起動を保証します。最小限のsystemdユニットファイルは以下の通りです。

ini
# /etc/systemd/system/celery-worker.service
[Unit]
Description=Celery Worker
After=network.target redis.service

[Service]
Type=forking
User=django
Group=django
WorkingDirectory=/opt/myproject
ExecStart=/opt/myproject/venv/bin/celery -A myproject worker \
    --loglevel=info --concurrency=4 --pidfile=/var/run/celery/worker.pid
ExecStop=/bin/kill -s TERM $MAINPID
Restart=always

[Install]
WantedBy=multi-user.target

本番環境でのミドルウェア統合とシグナルハンドリングのパターンについては、Djangoデプロイメントガイドでさらに詳しく解説されています。

今すぐ練習を始めましょう!

面接シミュレーターと技術テストで知識をテストしましょう。

まとめ

  • Celery 5.6.3(2026年3月リリース)では、メモリリーク修正、構造化ログ、クォーラムキュー対応、psycopg3互換性が追加されています。旧バージョンからのアップグレードが推奨されます。
  • タスクには必ずプライマリキーを渡し、ORMオブジェクトは渡さないでください。シリアライズエラーとデータの陳腐化を防止できます。
  • ワークロードの特性に基づいてタスクを専用キューにルーティングしてください。高速な通知と低速なレポートが同じワーカーで競合しないようにします。
  • 重要なワークロードにはacks_late=Trueと冪等なタスク設計を使用し、ワーカークラッシュに対する耐障害性を確保してください。
  • すべてのタスクにtime_limitsoft_time_limitの両方を設定し、ハングしたワーカーがリソースを無限に消費するのを防止してください。
  • Django 6.0の組み込みタスクフレームワークはシンプルなバックグラウンドジョブを処理できますが、分散処理、スケジューリング、Canvasワークフローには引き続きCeleryが必要です。
  • Flowerまたは構造化ログアグリゲーションでキューの深さとリトライ率を監視し、ユーザーに影響が出る前にキャパシティの問題を検出してください。

タグ

#django
#celery
#python
#async
#interview

共有

関連記事