Symfony Messenger完全解説:キュー、ワーカー、非同期アーキテクチャの面接対策2026
Symfony Messengerのメッセージバスアーキテクチャ、トランスポート設定、ワーカー管理、重複排除ミドルウェア、リトライ戦略、Symfony 7.3+のストリーミングAMQPトランスポートまで徹底解説。2026年の技術面接で問われるポイントを網羅。

Symfony Messengerは、PHPにおける非同期ワークロード処理の中核コンポーネントである。構造化されたメッセージバス、専用トランスポート、監視対象のワーカーを通じて、複雑な非同期処理を管理する。Symfony 7.3および8.0では、重複排除ミドルウェア、ストリーミングAMQPトランスポート、Doctrineキープアライブなどが追加され、Laravel HorizonやSidekiqに匹敵する機能を備えるに至った。
Symfony 7.3では、DeduplicateMiddleware(自動メッセージ重複排除)、Doctrineトランスポートのキープアライブ(長時間処理タスクの再配信防止)、#[AsMessage]属性(宣言的トランスポートルーティング)が追加された。
Messengerのアーキテクチャ:バス、トランスポート、ワーカー
Messengerコンポーネントは3つの関心事を分離している。ディスパッチ(バス)、配信(トランスポート)、処理(ワーカー)である。メッセージはプレーンなPHPオブジェクトであり、ハンドラーは呼び出し可能なクラスである。バスが両者を接続し、ミドルウェアを介してルーティングを行い、必要に応じてメッセージをトランスポートにシリアライズして非同期処理を実現する。
namespace App\Message;
final readonly class InvoiceGenerated
{
public function __construct(
public int $orderId,
public string $customerEmail,
) {}
}このメッセージにはスカラーデータのみが含まれており、Doctrineエンティティは渡していない。オブジェクトではなくIDを渡すことで、シリアライズの問題を回避し、メッセージを軽量に保つことができる。ハンドラーは処理時にデータベースから最新のデータを取得する。
namespace App\MessageHandler;
use App\Message\InvoiceGenerated;
use App\Service\InvoiceService;
use App\Service\MailerService;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
final readonly class InvoiceGeneratedHandler
{
public function __construct(
private InvoiceService $invoiceService,
private MailerService $mailerService,
) {}
public function __invoke(InvoiceGenerated $message): void
{
$pdf = $this->invoiceService->generatePdf($message->orderId);
$this->mailerService->sendInvoice(
$message->customerEmail,
$pdf,
);
}
}コントローラーやサービスからメッセージをディスパッチする方法は非常にシンプルである。
$this->bus->dispatch(new InvoiceGenerated(
orderId: $order->getId(),
customerEmail: $order->getCustomer()->getEmail(),
));同期処理と非同期処理のどちらで実行するかは、トランスポートルーティングの設定によって決まる。
トランスポート設定とキューバックエンド
Messengerは、Doctrine DBAL、Redis、Amazon SQS、Beanstalkd、AMQP(RabbitMQ)、および2025年に導入されたストリーミングAMQPトランスポートをサポートしている。各トランスポートはDSN文字列で定義される。
# config/packages/messenger.yaml
framework:
messenger:
failure_transport: failed
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: high
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 3
max_delay: 60000
async_priority_low:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: low
retry_strategy:
max_retries: 5
delay: 5000
multiplier: 2
failed:
dsn: 'doctrine://default?queue_name=failed'
routing:
'App\Message\InvoiceGenerated': async_priority_high
'App\Message\CleanupTempFiles': async_priority_low優先度別にトランスポートを分割することで、時間に敏感なタスク(請求書生成)がハウスキーピングタスク(一時ファイル削除)より先に処理されることが保証される。各トランスポートには独自のリトライ戦略が設定され、メッセージの重要度と冪等性に応じて調整される。
Doctrineトランスポートは追加のインフラストラクチャが不要だが、データベースへの負荷が増加する。Redisはサブミリ秒のレイテンシを提供する。AMQP(RabbitMQ)は高度なルーティング、デッドレターエクスチェンジ、高スループットシナリオ向けのストリーミングトランスポートを備えている。既存のインフラストラクチャとスループット要件に基づいて選択するのが望ましい。
Supervisorによるワーカー管理
ワーカーはトランスポートからメッセージを消費する。本番環境では、messenger:consumeコマンドをSupervisorやsystemdなどのプロセスマネージャーの下で実行する。
# 高優先度メッセージを先に消費し、次に低優先度を処理
php bin/console messenger:consume async_priority_high async_priority_low \
--memory-limit=128M \
--time-limit=3600 \
--limit=5003つの制限フラグにより、メモリリークを防止し、ワーカーが定期的に再起動されるようにする。Supervisorはプロセスの終了後に自動的に再起動を行う。
; /etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /var/www/app/bin/console messenger:consume async_priority_high async_priority_low --memory-limit=128M --time-limit=3600
user=www-data
numprocs=2
process_name=%(program_name)s_%(process_num)02d
autostart=true
autorestart=true
startsecs=0
stopwaitsecs=30
stdout_logfile=/var/log/messenger-worker.log
stderr_logfile=/var/log/messenger-worker-error.lognumprocs=2を指定すると、2つの並列ワーカーが起動し、スループットが2倍になる。サーバーのCPUコア数とメッセージの処理時間に応じて調整する。
ミドルウェアパイプラインと複数バスによるCQRS
ミドルウェアは全てのメッセージディスパッチをラップし、横断的な関心事を追加する。組み込みのミドルウェアスタックは、バリデーション、Doctrineトランザクション、ルーティングを処理する。
# config/packages/messenger.yaml
framework:
messenger:
default_bus: command.bus
buses:
command.bus:
middleware:
- validation
- doctrine_transaction
query.bus:
middleware:
- validation
event.bus:
default_middleware:
allow_no_handlers: true
middleware:
- validationこの設定はCQRS(コマンドクエリ責務分離)を実装している。コマンドはDoctrineトランザクション内で状態を変更する。クエリは読み取り専用である。イベントはハンドラーがゼロでも許容され、リスナーを独立して追加できるPub/Subパターンを実現する。
Symfonyの面接対策はできていますか?
インタラクティブなシミュレーター、flashcards、技術テストで練習しましょう。
Symfony 7.3の重複排除ミドルウェア
重複メッセージはリソースを浪費し、顧客への二重課金などの副作用を引き起こす可能性がある。Symfony 7.3では、キュー内の同一メッセージを自動的にスキップするDeduplicateMiddlewareが導入された。
namespace App\Message;
use Symfony\Component\Messenger\Stamp\DeduplicateStamp;
final readonly class SendWelcomeEmail
{
public function __construct(
public int $userId,
) {}
}// 重複排除付きでディスパッチ
use Symfony\Component\Messenger\Stamp\DeduplicateStamp;
$this->bus->dispatch(
new SendWelcomeEmail(userId: 42),
[new DeduplicateStamp(id: 'welcome-email-42')],
);DeduplicateStampはロックリソース識別子を受け取る。同じIDのメッセージが既にキューに存在する場合、新しいディスパッチは暗黙的に破棄される。この機能を使用するには、Lockコンポーネントとシリアライズ可能なストア(Redis、Memcached、またはデータベース)が必要である。
リトライ戦略とフェイルトランスポート
ハンドラーが例外をスローすると、Messengerはトランスポートのリトライ戦略に従ってメッセージを再試行する。リトライ回数を使い切ると、メッセージはフェイルトランスポートに移動される。
namespace App\MessageHandler;
use App\Message\ProcessPayment;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;
#[AsMessageHandler]
final class PaymentHandler
{
public function __invoke(ProcessPayment $message): void
{
try {
$this->gateway->charge($message->amount, $message->token);
} catch (GatewayTimeoutException $e) {
// 回復可能:バックオフ付きでリトライ
throw new RecoverableMessageHandlingException(
'Payment gateway timeout, retrying',
previous: $e,
);
} catch (InvalidCardException $e) {
// 回復不可能:即座にフェイルトランスポートへ
throw new UnrecoverableMessageHandlingException(
'Invalid card, no retry',
previous: $e,
);
}
}
}RecoverableMessageHandlingExceptionはリトライ戦略をトリガーする。UnrecoverableMessageHandlingExceptionはリトライを完全にバイパスし、メッセージを直接フェイルトランスポートに送信する。この区別により、永続的に失敗するメッセージに対する無駄なリトライを防ぐことができる。
# 失敗したメッセージの確認と管理
php bin/console messenger:failed:show
php bin/console messenger:failed:show 20 --transport=failed
# 特定のメッセージをリトライ
php bin/console messenger:failed:retry 20 30
# クラスでフィルタして削除(Symfony 7.3+)
php bin/console messenger:failed:remove --class-filter="App\Message\CleanupTempFiles"リトライが発生すると、同一メッセージに対してハンドラーが複数回実行される可能性がある。全てのハンドラーを冪等に設計することが重要である。処理を繰り返す前に、作業が既に完了しているかを確認する。データベースのユニーク制約やフラグチェックを使用して、二重処理を防止する。
Doctrineキープアライブと長時間処理メッセージ
長時間実行されるメッセージハンドラーは、トランスポートの可視性タイムアウトが切れた際にメッセージが再配信されるリスクがある。Symfony 7.2ではRedis、SQS、Beanstalkd向けにキープアライブが導入された。Symfony 7.3ではDoctrineトランスポートにも拡張された。
# 長時間処理中の再配信を防ぐためにキープアライブを有効化
php bin/console messenger:consume async --keepalive--keepaliveフラグは、Doctrineトランスポートテーブルのdelivered_atタイムスタンプを定期的に更新し、ワーカーがまだメッセージを処理中であることを通知する。このフラグがない場合、トランスポートのタイムアウト(デフォルト5分)を超えて処理されているメッセージは、別のワーカーに取得されて重複処理が発生する。
#[AsMessage]属性による宣言的ルーティング
Symfony 7.2では、トランスポートルーティングをYAMLからメッセージクラス自体に移す#[AsMessage]属性が導入された。
namespace App\Message;
use Symfony\Component\Messenger\Attribute\AsMessage;
#[AsMessage(transport: 'async_priority_low')]
final readonly class GenerateReport
{
public function __construct(
public int $reportId,
public string $format = 'pdf',
) {}
}この属性により、対象メッセージのmessenger.yaml内のroutingセクションが不要になる。トランスポートがソースコードで宣言されるため、コードベースが自己文書化される。YAML設定と属性の両方のアプローチは共存可能であり、属性が優先される。
高スループットキュー向けストリーミングAMQPトランスポート
従来のAMQPトランスポートはポーリング(get())でメッセージを取得するため、RabbitMQに不要な負荷をかけていた。2025年にリリースされたストリーミングAMQPトランスポートは、プッシュモデル(consume())に切り替えることで、レイテンシとリソース使用量を削減する。
# config/packages/messenger.yaml
framework:
messenger:
transports:
streaming:
dsn: 'amqp-lib://guest:guest@localhost:5672/%2f/messages'
options:
exchange:
name: app_events
type: topic
queues:
order_events:
binding_keys: ['order.*']デフォルトのAMQPトランスポートとの主な違いは次の通りである。C拡張が不要であること(php-amqplibを使用)、長寿命のTCP接続によるストリーミング配信、バインディングキールーティングによるトピックエクスチェンジのネイティブサポートである。このトランスポートは、最小限のCPUオーバーヘッドで毎秒数千のメッセージを処理できる。
まとめ
- メッセージにはDoctrineエンティティではなくスカラーIDを渡し、ハンドラーで最新データを取得することで、シリアライズの問題と古いデータの状態を回避する
- 優先度と重要度に応じてトランスポートを分割し、各トランスポートに独自のリトライ戦略と専用ワーカープールを設定する
RecoverableMessageHandlingExceptionとUnrecoverableMessageHandlingExceptionを使用して、リトライ動作を明示的に制御する- Doctrineトランスポートのワーカーでは
--keepaliveを有効にして、長時間処理メッセージの再配信を防ぐ - 重複処理が副作用を引き起こすメッセージ(決済、メール、通知)には
DeduplicateStampを適用する - 複数バスによるCQRSを実装する。コマンドバスはDoctrineトランザクション付きの変更操作、クエリバスは読み取り、イベントバスはPub/Subに使用する
- 本番環境ではSupervisorまたはsystemdを使用し、
--memory-limit、--time-limit、--limitフラグでワーカーのライフサイクルを管理する - サブミリ秒のレイテンシが求められる高スループットシナリオでは、ストリーミングAMQPトランスポートの採用を検討する
今すぐ練習を始めましょう!
面接シミュレーターと技術テストで知識をテストしましょう。
タグ
共有
関連記事

Symfony 8の新機能を徹底解説:PHP 8.4レイジーオブジェクト、マルチステップフォーム、面接対策まで
Symfony 8はPHP 8.4を必須とし、ネイティブレイジーオブジェクト、AbstractFlowType、呼び出し可能コマンドなど多数の新機能を搭載しています。本記事では主要機能をコード例とともに解説し、2026年の面接対策ポイントも紹介します。

Doctrine ORM:Symfonyにおけるリレーションのマスター
SymfonyにおけるDoctrine ORMリレーションの完全ガイド。OneToMany、ManyToMany、ロード戦略、パフォーマンス最適化を実例とともに解説します。

Symfony面接質問集: 2026年トップ25
最も多く尋ねられるSymfony面接質問25選。アーキテクチャ、Doctrine ORM、サービス、セキュリティ、フォーム、テストを詳細な回答とコード例とともに解説。