Symfony Messenger完全解説:キュー、ワーカー、非同期アーキテクチャの面接対策2026

Symfony Messengerのメッセージバスアーキテクチャ、トランスポート設定、ワーカー管理、重複排除ミドルウェア、リトライ戦略、Symfony 7.3+のストリーミングAMQPトランスポートまで徹底解説。2026年の技術面接で問われるポイントを網羅。

Symfony Messenger 非同期キューとワーカーのアーキテクチャ図

Symfony Messengerは、PHPにおける非同期ワークロード処理の中核コンポーネントである。構造化されたメッセージバス、専用トランスポート、監視対象のワーカーを通じて、複雑な非同期処理を管理する。Symfony 7.3および8.0では、重複排除ミドルウェア、ストリーミングAMQPトランスポート、Doctrineキープアライブなどが追加され、Laravel HorizonやSidekiqに匹敵する機能を備えるに至った。

Symfony 7.3+ Messenger の主要な進化

Symfony 7.3では、DeduplicateMiddleware(自動メッセージ重複排除)、Doctrineトランスポートのキープアライブ(長時間処理タスクの再配信防止)、#[AsMessage]属性(宣言的トランスポートルーティング)が追加された。

Messengerのアーキテクチャ:バス、トランスポート、ワーカー

Messengerコンポーネントは3つの関心事を分離している。ディスパッチ(バス)、配信(トランスポート)、処理(ワーカー)である。メッセージはプレーンなPHPオブジェクトであり、ハンドラーは呼び出し可能なクラスである。バスが両者を接続し、ミドルウェアを介してルーティングを行い、必要に応じてメッセージをトランスポートにシリアライズして非同期処理を実現する。

src/Message/InvoiceGenerated.phpphp
namespace App\Message;

final readonly class InvoiceGenerated
{
    public function __construct(
        public int $orderId,
        public string $customerEmail,
    ) {}
}

このメッセージにはスカラーデータのみが含まれており、Doctrineエンティティは渡していない。オブジェクトではなくIDを渡すことで、シリアライズの問題を回避し、メッセージを軽量に保つことができる。ハンドラーは処理時にデータベースから最新のデータを取得する。

src/MessageHandler/InvoiceGeneratedHandler.phpphp
namespace App\MessageHandler;

use App\Message\InvoiceGenerated;
use App\Service\InvoiceService;
use App\Service\MailerService;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
final readonly class InvoiceGeneratedHandler
{
    public function __construct(
        private InvoiceService $invoiceService,
        private MailerService $mailerService,
    ) {}

    public function __invoke(InvoiceGenerated $message): void
    {
        $pdf = $this->invoiceService->generatePdf($message->orderId);
        $this->mailerService->sendInvoice(
            $message->customerEmail,
            $pdf,
        );
    }
}

コントローラーやサービスからメッセージをディスパッチする方法は非常にシンプルである。

src/Controller/OrderController.phpphp
$this->bus->dispatch(new InvoiceGenerated(
    orderId: $order->getId(),
    customerEmail: $order->getCustomer()->getEmail(),
));

同期処理と非同期処理のどちらで実行するかは、トランスポートルーティングの設定によって決まる。

トランスポート設定とキューバックエンド

Messengerは、Doctrine DBAL、Redis、Amazon SQS、Beanstalkd、AMQP(RabbitMQ)、および2025年に導入されたストリーミングAMQPトランスポートをサポートしている。各トランスポートはDSN文字列で定義される。

yaml
# config/packages/messenger.yaml
framework:
    messenger:
        failure_transport: failed

        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: high
                retry_strategy:
                    max_retries: 3
                    delay: 1000
                    multiplier: 3
                    max_delay: 60000

            async_priority_low:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: low
                retry_strategy:
                    max_retries: 5
                    delay: 5000
                    multiplier: 2

            failed:
                dsn: 'doctrine://default?queue_name=failed'

        routing:
            'App\Message\InvoiceGenerated': async_priority_high
            'App\Message\CleanupTempFiles': async_priority_low

優先度別にトランスポートを分割することで、時間に敏感なタスク(請求書生成)がハウスキーピングタスク(一時ファイル削除)より先に処理されることが保証される。各トランスポートには独自のリトライ戦略が設定され、メッセージの重要度と冪等性に応じて調整される。

Doctrine vs. Redis vs. AMQP の比較

Doctrineトランスポートは追加のインフラストラクチャが不要だが、データベースへの負荷が増加する。Redisはサブミリ秒のレイテンシを提供する。AMQP(RabbitMQ)は高度なルーティング、デッドレターエクスチェンジ、高スループットシナリオ向けのストリーミングトランスポートを備えている。既存のインフラストラクチャとスループット要件に基づいて選択するのが望ましい。

Supervisorによるワーカー管理

ワーカーはトランスポートからメッセージを消費する。本番環境では、messenger:consumeコマンドをSupervisorやsystemdなどのプロセスマネージャーの下で実行する。

bash
# 高優先度メッセージを先に消費し、次に低優先度を処理
php bin/console messenger:consume async_priority_high async_priority_low \
    --memory-limit=128M \
    --time-limit=3600 \
    --limit=500

3つの制限フラグにより、メモリリークを防止し、ワーカーが定期的に再起動されるようにする。Supervisorはプロセスの終了後に自動的に再起動を行う。

ini
; /etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /var/www/app/bin/console messenger:consume async_priority_high async_priority_low --memory-limit=128M --time-limit=3600
user=www-data
numprocs=2
process_name=%(program_name)s_%(process_num)02d
autostart=true
autorestart=true
startsecs=0
stopwaitsecs=30
stdout_logfile=/var/log/messenger-worker.log
stderr_logfile=/var/log/messenger-worker-error.log

numprocs=2を指定すると、2つの並列ワーカーが起動し、スループットが2倍になる。サーバーのCPUコア数とメッセージの処理時間に応じて調整する。

ミドルウェアパイプラインと複数バスによるCQRS

ミドルウェアは全てのメッセージディスパッチをラップし、横断的な関心事を追加する。組み込みのミドルウェアスタックは、バリデーション、Doctrineトランザクション、ルーティングを処理する。

yaml
# config/packages/messenger.yaml
framework:
    messenger:
        default_bus: command.bus
        buses:
            command.bus:
                middleware:
                    - validation
                    - doctrine_transaction
            query.bus:
                middleware:
                    - validation
            event.bus:
                default_middleware:
                    allow_no_handlers: true
                middleware:
                    - validation

この設定はCQRS(コマンドクエリ責務分離)を実装している。コマンドはDoctrineトランザクション内で状態を変更する。クエリは読み取り専用である。イベントはハンドラーがゼロでも許容され、リスナーを独立して追加できるPub/Subパターンを実現する。

Symfonyの面接対策はできていますか?

インタラクティブなシミュレーター、flashcards、技術テストで練習しましょう。

Symfony 7.3の重複排除ミドルウェア

重複メッセージはリソースを浪費し、顧客への二重課金などの副作用を引き起こす可能性がある。Symfony 7.3では、キュー内の同一メッセージを自動的にスキップするDeduplicateMiddlewareが導入された。

src/Message/SendWelcomeEmail.phpphp
namespace App\Message;

use Symfony\Component\Messenger\Stamp\DeduplicateStamp;

final readonly class SendWelcomeEmail
{
    public function __construct(
        public int $userId,
    ) {}
}
php
// 重複排除付きでディスパッチ
use Symfony\Component\Messenger\Stamp\DeduplicateStamp;

$this->bus->dispatch(
    new SendWelcomeEmail(userId: 42),
    [new DeduplicateStamp(id: 'welcome-email-42')],
);

DeduplicateStampはロックリソース識別子を受け取る。同じIDのメッセージが既にキューに存在する場合、新しいディスパッチは暗黙的に破棄される。この機能を使用するには、Lockコンポーネントとシリアライズ可能なストア(Redis、Memcached、またはデータベース)が必要である。

リトライ戦略とフェイルトランスポート

ハンドラーが例外をスローすると、Messengerはトランスポートのリトライ戦略に従ってメッセージを再試行する。リトライ回数を使い切ると、メッセージはフェイルトランスポートに移動される。

src/MessageHandler/PaymentHandler.phpphp
namespace App\MessageHandler;

use App\Message\ProcessPayment;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;

#[AsMessageHandler]
final class PaymentHandler
{
    public function __invoke(ProcessPayment $message): void
    {
        try {
            $this->gateway->charge($message->amount, $message->token);
        } catch (GatewayTimeoutException $e) {
            // 回復可能:バックオフ付きでリトライ
            throw new RecoverableMessageHandlingException(
                'Payment gateway timeout, retrying',
                previous: $e,
            );
        } catch (InvalidCardException $e) {
            // 回復不可能:即座にフェイルトランスポートへ
            throw new UnrecoverableMessageHandlingException(
                'Invalid card, no retry',
                previous: $e,
            );
        }
    }
}

RecoverableMessageHandlingExceptionはリトライ戦略をトリガーする。UnrecoverableMessageHandlingExceptionはリトライを完全にバイパスし、メッセージを直接フェイルトランスポートに送信する。この区別により、永続的に失敗するメッセージに対する無駄なリトライを防ぐことができる。

bash
# 失敗したメッセージの確認と管理
php bin/console messenger:failed:show
php bin/console messenger:failed:show 20 --transport=failed

# 特定のメッセージをリトライ
php bin/console messenger:failed:retry 20 30

# クラスでフィルタして削除(Symfony 7.3+)
php bin/console messenger:failed:remove --class-filter="App\Message\CleanupTempFiles"
冪等なハンドラーの設計

リトライが発生すると、同一メッセージに対してハンドラーが複数回実行される可能性がある。全てのハンドラーを冪等に設計することが重要である。処理を繰り返す前に、作業が既に完了しているかを確認する。データベースのユニーク制約やフラグチェックを使用して、二重処理を防止する。

Doctrineキープアライブと長時間処理メッセージ

長時間実行されるメッセージハンドラーは、トランスポートの可視性タイムアウトが切れた際にメッセージが再配信されるリスクがある。Symfony 7.2ではRedis、SQS、Beanstalkd向けにキープアライブが導入された。Symfony 7.3ではDoctrineトランスポートにも拡張された。

bash
# 長時間処理中の再配信を防ぐためにキープアライブを有効化
php bin/console messenger:consume async --keepalive

--keepaliveフラグは、Doctrineトランスポートテーブルのdelivered_atタイムスタンプを定期的に更新し、ワーカーがまだメッセージを処理中であることを通知する。このフラグがない場合、トランスポートのタイムアウト(デフォルト5分)を超えて処理されているメッセージは、別のワーカーに取得されて重複処理が発生する。

#[AsMessage]属性による宣言的ルーティング

Symfony 7.2では、トランスポートルーティングをYAMLからメッセージクラス自体に移す#[AsMessage]属性が導入された。

src/Message/GenerateReport.phpphp
namespace App\Message;

use Symfony\Component\Messenger\Attribute\AsMessage;

#[AsMessage(transport: 'async_priority_low')]
final readonly class GenerateReport
{
    public function __construct(
        public int $reportId,
        public string $format = 'pdf',
    ) {}
}

この属性により、対象メッセージのmessenger.yaml内のroutingセクションが不要になる。トランスポートがソースコードで宣言されるため、コードベースが自己文書化される。YAML設定と属性の両方のアプローチは共存可能であり、属性が優先される。

高スループットキュー向けストリーミングAMQPトランスポート

従来のAMQPトランスポートはポーリング(get())でメッセージを取得するため、RabbitMQに不要な負荷をかけていた。2025年にリリースされたストリーミングAMQPトランスポートは、プッシュモデル(consume())に切り替えることで、レイテンシとリソース使用量を削減する。

yaml
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            streaming:
                dsn: 'amqp-lib://guest:guest@localhost:5672/%2f/messages'
                options:
                    exchange:
                        name: app_events
                        type: topic
                    queues:
                        order_events:
                            binding_keys: ['order.*']

デフォルトのAMQPトランスポートとの主な違いは次の通りである。C拡張が不要であること(php-amqplibを使用)、長寿命のTCP接続によるストリーミング配信、バインディングキールーティングによるトピックエクスチェンジのネイティブサポートである。このトランスポートは、最小限のCPUオーバーヘッドで毎秒数千のメッセージを処理できる。

まとめ

  • メッセージにはDoctrineエンティティではなくスカラーIDを渡し、ハンドラーで最新データを取得することで、シリアライズの問題と古いデータの状態を回避する
  • 優先度と重要度に応じてトランスポートを分割し、各トランスポートに独自のリトライ戦略と専用ワーカープールを設定する
  • RecoverableMessageHandlingExceptionUnrecoverableMessageHandlingExceptionを使用して、リトライ動作を明示的に制御する
  • Doctrineトランスポートのワーカーでは--keepaliveを有効にして、長時間処理メッセージの再配信を防ぐ
  • 重複処理が副作用を引き起こすメッセージ(決済、メール、通知)にはDeduplicateStampを適用する
  • 複数バスによるCQRSを実装する。コマンドバスはDoctrineトランザクション付きの変更操作、クエリバスは読み取り、イベントバスはPub/Subに使用する
  • 本番環境ではSupervisorまたはsystemdを使用し、--memory-limit--time-limit--limitフラグでワーカーのライフサイクルを管理する
  • サブミリ秒のレイテンシが求められる高スループットシナリオでは、ストリーミングAMQPトランスポートの採用を検討する

今すぐ練習を始めましょう!

面接シミュレーターと技術テストで知識をテストしましょう。

タグ

#symfony
#messenger
#php
#async
#message queue
#workers

共有

関連記事