Symfony Messenger: Hàng Đợi, Worker và Kiến Trúc Bất Đồng Bộ cho Phỏng Vấn 2026

Hướng dẫn chuyên sâu Symfony Messenger: message bus, transport, worker, middleware chống trùng lặp, chiến lược retry và streaming AMQP trong Symfony 7.3+.

Kiến trúc Symfony Messenger hàng đợi và worker bất đồng bộ

Symfony Messenger là thành phần cốt lõi để xử lý các tác vụ bất đồng bộ trong hệ sinh thái PHP hiện đại. Thành phần này cung cấp một message bus có cấu trúc, các transport linh hoạt và worker được giám sát chặt chẽ. Với Symfony 7.3 và lộ trình phát triển đến phiên bản 8.0, những tính năng mới như middleware chống trùng lặp, streaming AMQP và Doctrine keepalive đã đưa Messenger lên ngang tầm với các hệ thống hàng đợi chuyên dụng như Laravel Horizon hay Sidekiq về mặt tính năng.

Điểm nổi bật của Symfony 7.3+ Messenger

Symfony 7.3 bổ sung DeduplicateMiddleware để tự động loại bỏ message trùng lặp, Doctrine transport keepalive để ngăn chặn việc gửi lại các tác vụ chạy lâu, và thuộc tính #[AsMessage] để khai báo routing transport một cách trực quan.

Kiến Trúc Symfony Messenger: Bus, Transport và Worker

Thành phần Messenger tách biệt ba mối quan tâm chính: dispatching (bus), vận chuyển (transport) và xử lý (worker). Một message là một đối tượng PHP thuần túy. Handler là một lớp có thể gọi được (invokable). Bus kết nối cả hai, định tuyến qua middleware và tùy chọn serialize message đến transport để xử lý bất đồng bộ.

src/Message/InvoiceGenerated.phpphp
namespace App\Message;

final readonly class InvoiceGenerated
{
    public function __construct(
        public int $orderId,
        public string $customerEmail,
    ) {}
}

Message này chỉ chứa dữ liệu scalar, không phải entity Doctrine. Việc truyền ID thay vì đối tượng đầy đủ giúp tránh các vấn đề serialization và giữ cho message nhẹ nhàng. Handler sẽ truy vấn dữ liệu mới nhất từ cơ sở dữ liệu tại thời điểm xử lý.

src/MessageHandler/InvoiceGeneratedHandler.phpphp
namespace App\MessageHandler;

use App\Message\InvoiceGenerated;
use App\Service\InvoiceService;
use App\Service\MailerService;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
final readonly class InvoiceGeneratedHandler
{
    public function __construct(
        private InvoiceService $invoiceService,
        private MailerService $mailerService,
    ) {}

    public function __invoke(InvoiceGenerated $message): void
    {
        $pdf = $this->invoiceService->generatePdf($message->orderId);
        $this->mailerService->sendInvoice(
            $message->customerEmail,
            $pdf,
        );
    }
}

Việc dispatch message từ controller hoặc service chỉ cần một dòng code:

src/Controller/OrderController.phpphp
$this->bus->dispatch(new InvoiceGenerated(
    orderId: $order->getId(),
    customerEmail: $order->getCustomer()->getEmail(),
));

Bus sẽ quyết định thực thi đồng bộ hay bất đồng bộ dựa trên cấu hình routing của transport.

Cấu Hình Transport và Các Backend Hàng Đợi

Messenger hỗ trợ nhiều backend khác nhau: Doctrine DBAL, Redis, Amazon SQS, Beanstalkd, AMQP (RabbitMQ) và streaming AMQP transport được giới thiệu vào năm 2025. Mỗi transport được định nghĩa bằng một chuỗi DSN.

yaml
# config/packages/messenger.yaml
framework:
    messenger:
        failure_transport: failed

        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: high
                retry_strategy:
                    max_retries: 3
                    delay: 1000
                    multiplier: 3
                    max_delay: 60000

            async_priority_low:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: low
                retry_strategy:
                    max_retries: 5
                    delay: 5000
                    multiplier: 2

            failed:
                dsn: 'doctrine://default?queue_name=failed'

        routing:
            'App\Message\InvoiceGenerated': async_priority_high
            'App\Message\CleanupTempFiles': async_priority_low

Việc phân tách transport theo mức độ ưu tiên đảm bảo các tác vụ nhạy cảm về thời gian (tạo hóa đơn) được xử lý trước các tác vụ bảo trì (dọn dẹp file tạm). Mỗi transport có chiến lược retry riêng, được điều chỉnh phù hợp với mức độ quan trọng và tính idempotent của message.

Doctrine vs. Redis vs. AMQP

Transport Doctrine không yêu cầu hạ tầng bổ sung nhưng tăng tải cho cơ sở dữ liệu. Redis cung cấp độ trễ dưới mili giây. AMQP (RabbitMQ) hỗ trợ routing nâng cao, dead-letter exchange và streaming transport mới cho các tình huống throughput cao. Việc lựa chọn nên dựa trên hạ tầng hiện có và yêu cầu throughput.

Quản Lý Worker với Supervisor

Worker tiêu thụ message từ transport. Trong môi trường production, lệnh messenger:consume được chạy dưới trình quản lý tiến trình như Supervisor hoặc systemd.

bash
# Tiêu thụ message ưu tiên cao trước, sau đó ưu tiên thấp
php bin/console messenger:consume async_priority_high async_priority_low \
    --memory-limit=128M \
    --time-limit=3600 \
    --limit=500

Ba flag giới hạn này ngăn ngừa rò rỉ bộ nhớ và đảm bảo worker khởi động lại định kỳ. Supervisor sẽ tự động khởi động lại tiến trình sau mỗi lần thoát.

ini
; /etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /var/www/app/bin/console messenger:consume async_priority_high async_priority_low --memory-limit=128M --time-limit=3600
user=www-data
numprocs=2
process_name=%(program_name)s_%(process_num)02d
autostart=true
autorestart=true
startsecs=0
stopwaitsecs=30
stdout_logfile=/var/log/messenger-worker.log
stderr_logfile=/var/log/messenger-worker-error.log

Cài đặt numprocs=2 khởi tạo hai worker song song, nhân đôi throughput. Con số này có thể điều chỉnh dựa trên số core CPU của server và thời gian xử lý message.

Pipeline Middleware và CQRS với Nhiều Bus

Middleware bao bọc mỗi lần dispatch message, bổ sung các cross-cutting concern. Stack middleware tích hợp sẵn xử lý validation, Doctrine transaction và routing.

yaml
# config/packages/messenger.yaml
framework:
    messenger:
        default_bus: command.bus
        buses:
            command.bus:
                middleware:
                    - validation
                    - doctrine_transaction
            query.bus:
                middleware:
                    - validation
            event.bus:
                default_middleware:
                    allow_no_handlers: true
                middleware:
                    - validation

Cấu hình này triển khai mô hình CQRS (Command Query Responsibility Segregation). Command thay đổi state bên trong Doctrine transaction. Query chỉ đọc dữ liệu. Event cho phép không có handler nào, tạo điều kiện cho mô hình pub/sub nơi listener có thể được thêm vào một cách độc lập.

Sẵn sàng chinh phục phỏng vấn Symfony?

Luyện tập với mô phỏng tương tác, flashcards và bài kiểm tra kỹ thuật.

Middleware Chống Trùng Lặp trong Symfony 7.3

Các message trùng lặp gây lãng phí tài nguyên và có thể dẫn đến tác dụng phụ như tính phí khách hàng hai lần. Symfony 7.3 giới thiệu DeduplicateMiddleware để tự động bỏ qua các message giống hệt nhau đã có trong hàng đợi.

src/Message/SendWelcomeEmail.phpphp
namespace App\Message;

use Symfony\Component\Messenger\Stamp\DeduplicateStamp;

final readonly class SendWelcomeEmail
{
    public function __construct(
        public int $userId,
    ) {}
}
php
// Dispatch với tính năng chống trùng lặp
use Symfony\Component\Messenger\Stamp\DeduplicateStamp;

$this->bus->dispatch(
    new SendWelcomeEmail(userId: 42),
    [new DeduplicateStamp(id: 'welcome-email-42')],
);

DeduplicateStamp nhận một định danh lock resource. Nếu một message có cùng ID đã đang ở trạng thái pending, lần dispatch mới sẽ bị bỏ qua một cách im lặng. Tính năng này yêu cầu thành phần Lock với store có khả năng serialize (Redis, Memcached hoặc database).

Chiến Lược Retry và Failure Transport

Khi handler ném ra exception, Messenger sẽ thử lại message theo chiến lược retry của transport. Sau khi hết số lần thử, message được chuyển đến failure transport.

src/MessageHandler/PaymentHandler.phpphp
namespace App\MessageHandler;

use App\Message\ProcessPayment;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;

#[AsMessageHandler]
final class PaymentHandler
{
    public function __invoke(ProcessPayment $message): void
    {
        try {
            $this->gateway->charge($message->amount, $message->token);
        } catch (GatewayTimeoutException $e) {
            // Có thể phục hồi: thử lại với backoff
            throw new RecoverableMessageHandlingException(
                'Payment gateway timeout, retrying',
                previous: $e,
            );
        } catch (InvalidCardException $e) {
            // Không thể phục hồi: gửi thẳng đến failure transport
            throw new UnrecoverableMessageHandlingException(
                'Invalid card, no retry',
                previous: $e,
            );
        }
    }
}

RecoverableMessageHandlingException kích hoạt chiến lược retry. UnrecoverableMessageHandlingException bỏ qua hoàn toàn quá trình retry và gửi message trực tiếp đến failure transport. Sự phân biệt này tránh lãng phí lần retry cho những message chắc chắn thất bại.

bash
# Kiểm tra và quản lý message thất bại
php bin/console messenger:failed:show
php bin/console messenger:failed:show 20 --transport=failed

# Thử lại các message cụ thể
php bin/console messenger:failed:retry 20 30

# Lọc và xóa theo class (Symfony 7.3+)
php bin/console messenger:failed:remove --class-filter="App\Message\CleanupTempFiles"
Handler Idempotent

Cơ chế retry đồng nghĩa với việc handler có thể được thực thi nhiều lần cho cùng một message. Mọi handler cần được thiết kế idempotent: kiểm tra xem công việc đã hoàn thành chưa trước khi thực hiện lại. Sử dụng unique constraint trong cơ sở dữ liệu hoặc cờ trạng thái để ngăn chặn xử lý trùng lặp.

Doctrine Keepalive và Message Chạy Lâu

Các handler xử lý message trong thời gian dài có nguy cơ bị gửi lại khi visibility timeout của transport hết hạn. Symfony 7.2 giới thiệu keepalive cho Redis, SQS và Beanstalkd. Symfony 7.3 mở rộng tính năng này cho transport Doctrine.

bash
# Bật keepalive để ngăn gửi lại trong quá trình xử lý lâu
php bin/console messenger:consume async --keepalive

Flag --keepalive định kỳ cập nhật timestamp delivered_at trong bảng transport Doctrine, báo hiệu rằng worker vẫn đang tích cực xử lý message. Không có flag này, một message được xử lý lâu hơn timeout của transport (mặc định 5 phút) sẽ bị worker khác lấy đi, gây ra xử lý trùng lặp.

Thuộc Tính #[AsMessage] cho Routing Khai Báo

Symfony 7.2 giới thiệu thuộc tính #[AsMessage], chuyển routing transport từ YAML sang chính lớp message.

src/Message/GenerateReport.phpphp
namespace App\Message;

use Symfony\Component\Messenger\Attribute\AsMessage;

#[AsMessage(transport: 'async_priority_low')]
final readonly class GenerateReport
{
    public function __construct(
        public int $reportId,
        public string $format = 'pdf',
    ) {}
}

Cách tiếp cận này loại bỏ phần routing trong messenger.yaml cho message đó. Transport được khai báo ngay tại nguồn, giúp codebase tự mô tả rõ ràng hơn. Cả hai cách (routing YAML và thuộc tính) có thể cùng tồn tại, với thuộc tính được ưu tiên.

Streaming AMQP Transport cho Hàng Đợi Throughput Cao

Transport AMQP truyền thống sử dụng polling (get()) để lấy message, tạo ra tải không cần thiết lên RabbitMQ. Streaming AMQP transport ra mắt năm 2025 chuyển sang mô hình push (consume()), giảm độ trễ và mức sử dụng tài nguyên.

yaml
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            streaming:
                dsn: 'amqp-lib://guest:guest@localhost:5672/%2f/messages'
                options:
                    exchange:
                        name: app_events
                        type: topic
                    queues:
                        order_events:
                            binding_keys: ['order.*']

Điểm khác biệt chính so với transport AMQP mặc định: không cần extension C (sử dụng php-amqplib), vận chuyển streaming qua kết nối TCP duy trì lâu dài, và hỗ trợ native cho topic exchange với routing binding key. Transport này có khả năng xử lý hàng nghìn message mỗi giây với overhead CPU tối thiểu.

Kết Luận

  • Truyền ID scalar trong message, không phải entity Doctrine; truy vấn dữ liệu mới nhất trong handler để tránh vấn đề serialization và state lỗi thời
  • Phân tách transport theo mức ưu tiên và mức độ quan trọng; mỗi transport có chiến lược retry và pool worker riêng
  • Sử dụng RecoverableMessageHandlingExceptionUnrecoverableMessageHandlingException để kiểm soát hành vi retry một cách rõ ràng
  • Bật --keepalive trên worker transport Doctrine để ngăn gửi lại message chạy lâu
  • Áp dụng DeduplicateStamp cho message mà xử lý trùng lặp gây tác dụng phụ (thanh toán, email, thông báo)
  • Triển khai CQRS với nhiều bus: command bus cho mutation với Doctrine transaction, query bus cho đọc dữ liệu, event bus cho pub/sub
  • Sử dụng Supervisor hoặc systemd trong production với các flag --memory-limit, --time-limit--limit để quản lý vòng đời worker
  • Cân nhắc streaming AMQP transport cho tình huống throughput cao yêu cầu độ trễ dưới mili giây

Bắt đầu luyện tập!

Kiểm tra kiến thức với mô phỏng phỏng vấn và bài kiểm tra kỹ thuật.

Thẻ

#symfony
#messenger
#queue
#worker
#async
#php
#interview

Chia sẻ

Bài viết liên quan