Symfony Messenger: Antrian, Worker, dan Arsitektur Asinkron untuk Wawancara Kerja 2026

Panduan mendalam Symfony Messenger: message bus, transport, worker, middleware deduplikasi, strategi retry, dan streaming AMQP di Symfony 7.3+.

Symfony Messenger arsitektur antrian dan worker asinkron

Symfony Messenger merupakan komponen inti untuk menangani beban kerja asinkron dalam ekosistem PHP modern. Komponen ini menyediakan message bus terstruktur, transport yang dapat dikonfigurasi, dan worker yang diawasi secara ketat. Dengan hadirnya Symfony 7.3 dan roadmap menuju 8.0, fitur-fitur baru seperti middleware deduplikasi, streaming AMQP, dan Doctrine keepalive menjadikan Messenger setara dengan sistem antrian khusus seperti Laravel Horizon atau Sidekiq dalam hal kelengkapan fitur.

Fitur Baru Symfony 7.3+ Messenger

Symfony 7.3 menghadirkan DeduplicateMiddleware untuk deduplikasi pesan otomatis, Doctrine transport keepalive untuk mencegah pengiriman ulang pada tugas berjalan lama, dan atribut #[AsMessage] untuk routing transport secara deklaratif.

Arsitektur Symfony Messenger: Bus, Transport, dan Worker

Komponen Messenger memisahkan tiga tanggung jawab utama: dispatching (bus), pengiriman (transport), dan pemrosesan (worker). Sebuah message adalah objek PHP biasa. Handler adalah kelas yang dapat dipanggil (invokable). Bus menghubungkan keduanya, merutekan melalui middleware dan secara opsional melakukan serialisasi pesan ke transport untuk pemrosesan asinkron.

src/Message/InvoiceGenerated.phpphp
namespace App\Message;

final readonly class InvoiceGenerated
{
    public function __construct(
        public int $orderId,
        public string $customerEmail,
    ) {}
}

Perhatikan bahwa message ini hanya membawa data skalar, bukan entitas Doctrine. Pengiriman ID alih-alih objek penuh menghindari masalah serialisasi dan menjaga pesan tetap ringan. Handler akan mengambil data terbaru dari database pada saat pemrosesan.

src/MessageHandler/InvoiceGeneratedHandler.phpphp
namespace App\MessageHandler;

use App\Message\InvoiceGenerated;
use App\Service\InvoiceService;
use App\Service\MailerService;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
final readonly class InvoiceGeneratedHandler
{
    public function __construct(
        private InvoiceService $invoiceService,
        private MailerService $mailerService,
    ) {}

    public function __invoke(InvoiceGenerated $message): void
    {
        $pdf = $this->invoiceService->generatePdf($message->orderId);
        $this->mailerService->sendInvoice(
            $message->customerEmail,
            $pdf,
        );
    }
}

Proses dispatch dari controller atau service cukup dengan satu baris kode:

src/Controller/OrderController.phpphp
$this->bus->dispatch(new InvoiceGenerated(
    orderId: $order->getId(),
    customerEmail: $order->getCustomer()->getEmail(),
));

Bus akan menentukan apakah eksekusi dilakukan secara sinkron atau asinkron berdasarkan konfigurasi routing transport.

Konfigurasi Transport dan Backend Antrian

Messenger mendukung berbagai backend: Doctrine DBAL, Redis, Amazon SQS, Beanstalkd, AMQP (RabbitMQ), dan streaming AMQP transport yang diperkenalkan pada 2025. Setiap transport didefinisikan melalui DSN string.

yaml
# config/packages/messenger.yaml
framework:
    messenger:
        failure_transport: failed

        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: high
                retry_strategy:
                    max_retries: 3
                    delay: 1000
                    multiplier: 3
                    max_delay: 60000

            async_priority_low:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: low
                retry_strategy:
                    max_retries: 5
                    delay: 5000
                    multiplier: 2

            failed:
                dsn: 'doctrine://default?queue_name=failed'

        routing:
            'App\Message\InvoiceGenerated': async_priority_high
            'App\Message\CleanupTempFiles': async_priority_low

Memisahkan transport berdasarkan prioritas memastikan tugas yang sensitif terhadap waktu (pembuatan invoice) diproses terlebih dahulu sebelum tugas pemeliharaan (pembersihan file sementara). Setiap transport memiliki strategi retry tersendiri yang disesuaikan dengan tingkat kekritisan dan idempotensi pesan-pesannya.

Doctrine vs. Redis vs. AMQP

Transport Doctrine tidak memerlukan infrastruktur tambahan namun menambah beban database. Redis menawarkan latensi sub-milidetik. AMQP (RabbitMQ) menyediakan routing tingkat lanjut, dead-letter exchange, dan streaming transport baru untuk skenario throughput tinggi. Pemilihan sebaiknya didasarkan pada infrastruktur yang sudah ada dan kebutuhan throughput.

Manajemen Worker dengan Supervisor

Worker mengonsumsi pesan dari transport. Di lingkungan produksi, perintah messenger:consume dijalankan di bawah process manager seperti Supervisor atau systemd.

bash
# Konsumsi pesan prioritas tinggi dulu, kemudian prioritas rendah
php bin/console messenger:consume async_priority_high async_priority_low \
    --memory-limit=128M \
    --time-limit=3600 \
    --limit=500

Tiga flag pembatas ini mencegah kebocoran memori dan memastikan worker melakukan restart secara berkala. Supervisor akan memulai ulang proses setelah setiap exit.

ini
; /etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /var/www/app/bin/console messenger:consume async_priority_high async_priority_low --memory-limit=128M --time-limit=3600
user=www-data
numprocs=2
process_name=%(program_name)s_%(process_num)02d
autostart=true
autorestart=true
startsecs=0
stopwaitsecs=30
stdout_logfile=/var/log/messenger-worker.log
stderr_logfile=/var/log/messenger-worker-error.log

Pengaturan numprocs=2 menjalankan dua worker paralel sehingga menggandakan throughput. Jumlah ini dapat disesuaikan berdasarkan core CPU server dan waktu pemrosesan pesan.

Pipeline Middleware dan CQRS dengan Multiple Bus

Middleware membungkus setiap dispatch pesan, menambahkan concern lintas-fungsi. Stack middleware bawaan menangani validasi, transaksi Doctrine, dan routing.

yaml
# config/packages/messenger.yaml
framework:
    messenger:
        default_bus: command.bus
        buses:
            command.bus:
                middleware:
                    - validation
                    - doctrine_transaction
            query.bus:
                middleware:
                    - validation
            event.bus:
                default_middleware:
                    allow_no_handlers: true
                middleware:
                    - validation

Konfigurasi ini mengimplementasikan pola CQRS (Command Query Responsibility Segregation). Command mengubah state di dalam transaksi Doctrine. Query bersifat read-only. Event mengizinkan zero handler, memungkinkan pola pub/sub di mana listener dapat ditambahkan secara independen.

Siap menguasai wawancara Symfony Anda?

Berlatih dengan simulator interaktif, flashcards, dan tes teknis kami.

Middleware Deduplikasi di Symfony 7.3

Pesan duplikat memboroskan resource dan dapat menyebabkan efek samping seperti menagih pelanggan dua kali. Symfony 7.3 memperkenalkan DeduplicateMiddleware untuk secara otomatis melewati pesan identik yang sudah berada dalam antrian.

src/Message/SendWelcomeEmail.phpphp
namespace App\Message;

use Symfony\Component\Messenger\Stamp\DeduplicateStamp;

final readonly class SendWelcomeEmail
{
    public function __construct(
        public int $userId,
    ) {}
}
php
// Dispatching dengan deduplikasi
use Symfony\Component\Messenger\Stamp\DeduplicateStamp;

$this->bus->dispatch(
    new SendWelcomeEmail(userId: 42),
    [new DeduplicateStamp(id: 'welcome-email-42')],
);

DeduplicateStamp menerima identifier resource lock. Jika pesan dengan ID yang sama sudah dalam status pending, dispatch baru akan diabaikan secara diam-diam. Fitur ini memerlukan komponen Lock dengan store yang dapat diserialisasi (Redis, Memcached, atau database).

Strategi Retry dan Failure Transport

Ketika handler melempar exception, Messenger mencoba ulang pesan sesuai strategi retry transport. Setelah semua percobaan habis, pesan dipindahkan ke failure transport.

src/MessageHandler/PaymentHandler.phpphp
namespace App\MessageHandler;

use App\Message\ProcessPayment;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;

#[AsMessageHandler]
final class PaymentHandler
{
    public function __invoke(ProcessPayment $message): void
    {
        try {
            $this->gateway->charge($message->amount, $message->token);
        } catch (GatewayTimeoutException $e) {
            // Dapat dipulihkan: coba ulang dengan backoff
            throw new RecoverableMessageHandlingException(
                'Payment gateway timeout, retrying',
                previous: $e,
            );
        } catch (InvalidCardException $e) {
            // Tidak dapat dipulihkan: kirim ke failure transport langsung
            throw new UnrecoverableMessageHandlingException(
                'Invalid card, no retry',
                previous: $e,
            );
        }
    }
}

RecoverableMessageHandlingException memicu strategi retry. UnrecoverableMessageHandlingException melewati retry sepenuhnya dan mengirim pesan langsung ke failure transport. Pembedaan ini mencegah pemborosan retry pada pesan yang sudah pasti gagal.

bash
# Inspeksi dan kelola pesan gagal
php bin/console messenger:failed:show
php bin/console messenger:failed:show 20 --transport=failed

# Coba ulang pesan tertentu
php bin/console messenger:failed:retry 20 30

# Filter dan hapus berdasarkan kelas (Symfony 7.3+)
php bin/console messenger:failed:remove --class-filter="App\Message\CleanupTempFiles"
Handler yang Idempoten

Mekanisme retry berarti handler dapat dieksekusi beberapa kali untuk pesan yang sama. Setiap handler harus dirancang idempoten: periksa apakah pekerjaan sudah selesai sebelum mengulanginya. Gunakan unique constraint pada database atau flag status untuk mencegah pemrosesan ganda.

Doctrine Keepalive dan Pesan Berjalan Lama

Handler yang memproses pesan dalam waktu lama berisiko mengalami pengiriman ulang ketika visibility timeout transport berakhir. Symfony 7.2 memperkenalkan keepalive untuk Redis, SQS, dan Beanstalkd. Symfony 7.3 memperluas fitur ini ke transport Doctrine.

bash
# Aktifkan keepalive untuk mencegah pengiriman ulang selama pemrosesan lama
php bin/console messenger:consume async --keepalive

Flag --keepalive secara berkala memperbarui timestamp delivered_at dalam tabel transport Doctrine, memberi sinyal bahwa worker masih aktif memproses pesan. Tanpa flag ini, pesan yang diproses lebih lama dari timeout transport (default 5 menit) akan diambil oleh worker lain, menyebabkan pemrosesan duplikat.

Atribut #[AsMessage] untuk Routing Deklaratif

Symfony 7.2 memperkenalkan atribut #[AsMessage], memindahkan routing transport dari YAML ke kelas message itu sendiri.

src/Message/GenerateReport.phpphp
namespace App\Message;

use Symfony\Component\Messenger\Attribute\AsMessage;

#[AsMessage(transport: 'async_priority_low')]
final readonly class GenerateReport
{
    public function __construct(
        public int $reportId,
        public string $format = 'pdf',
    ) {}
}

Pendekatan ini menghilangkan bagian routing di messenger.yaml untuk pesan tersebut. Transport dideklarasikan langsung di sumbernya, menjadikan codebase lebih self-documenting. Kedua pendekatan (routing YAML dan atribut) dapat berdampingan, dengan atribut yang diprioritaskan.

Streaming AMQP Transport untuk Antrian Throughput Tinggi

Transport AMQP tradisional menggunakan polling (get()) untuk mengambil pesan, menghasilkan beban yang tidak perlu pada RabbitMQ. Streaming AMQP transport yang dirilis pada 2025 beralih ke model push (consume()), mengurangi latensi dan penggunaan resource.

yaml
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            streaming:
                dsn: 'amqp-lib://guest:guest@localhost:5672/%2f/messages'
                options:
                    exchange:
                        name: app_events
                        type: topic
                    queues:
                        order_events:
                            binding_keys: ['order.*']

Perbedaan utama dari transport AMQP default: tidak memerlukan ekstensi C (menggunakan php-amqplib), pengiriman streaming melalui koneksi TCP yang bertahan lama, dan dukungan native untuk topic exchange dengan routing binding key. Transport ini mampu menangani ribuan pesan per detik dengan overhead CPU minimal.

Kesimpulan

  • Kirim ID skalar dalam pesan, bukan entitas Doctrine; ambil data terbaru di handler untuk menghindari masalah serialisasi dan state yang kedaluwarsa
  • Pisahkan transport berdasarkan prioritas dan tingkat kekritisan; setiap transport memiliki strategi retry dan pool worker tersendiri
  • Gunakan RecoverableMessageHandlingException dan UnrecoverableMessageHandlingException untuk mengontrol perilaku retry secara eksplisit
  • Aktifkan --keepalive pada worker transport Doctrine untuk mencegah pengiriman ulang pesan berjalan lama
  • Terapkan DeduplicateStamp pada pesan di mana pemrosesan duplikat menyebabkan efek samping (pembayaran, email, notifikasi)
  • Implementasikan CQRS dengan multiple bus: command bus untuk mutasi dengan transaksi Doctrine, query bus untuk pembacaan, event bus untuk pub/sub
  • Gunakan Supervisor atau systemd di produksi dengan flag --memory-limit, --time-limit, dan --limit untuk manajemen siklus hidup worker
  • Pertimbangkan streaming AMQP transport untuk skenario throughput tinggi yang memerlukan latensi sub-milidetik

Mulai berlatih!

Uji pengetahuan Anda dengan simulator wawancara dan tes teknis kami.

Tag

#symfony
#messenger
#queue
#worker
#async
#php
#interview

Bagikan

Artikel terkait