Symfony Messenger en 2026: Colas, Workers y Arquitectura Asincrona para Entrevistas Tecnicas

Guia completa de Symfony Messenger: arquitectura del bus de mensajes, configuracion de transportes, gestion de workers, middleware de deduplicacion, estrategias de reintento y transporte AMQP streaming en Symfony 7.3+.

Diagrama de arquitectura de Symfony Messenger con colas asincronas y workers

Symfony Messenger gestiona cargas de trabajo asincronas en PHP mediante un bus de mensajes estructurado, transportes dedicados y workers supervisados. Con la llegada de Symfony 7.3 y las novedades previstas en la rama 8.0, el componente incorpora middleware de deduplicacion, transporte AMQP streaming y keepalive para Doctrine, posicionandose a la altura de sistemas especializados como Laravel Horizon o Sidekiq en cuanto a funcionalidades.

Novedades de Messenger en Symfony 7.3+

Symfony 7.3 incorpora DeduplicateMiddleware para la eliminacion automatica de mensajes duplicados, keepalive en el transporte Doctrine para evitar la reentrega de tareas de larga duracion, y el atributo #[AsMessage] para enrutamiento declarativo de transportes directamente en la clase del mensaje.

Arquitectura de Symfony Messenger: Bus, Transporte y Worker

El componente Messenger separa tres responsabilidades: el despacho (bus), la entrega (transporte) y el procesamiento (worker). Un mensaje es un objeto PHP plano. Un handler es una clase invocable. El bus conecta ambos, enrutando a traves del pipeline de middleware y, opcionalmente, serializando el mensaje hacia un transporte para su procesamiento asincrono.

src/Message/InvoiceGenerated.phpphp
namespace App\Message;

final readonly class InvoiceGenerated
{
    public function __construct(
        public int $orderId,
        public string $customerEmail,
    ) {}
}

Este mensaje transporta unicamente datos escalares, no entidades de Doctrine. Pasar identificadores en lugar de objetos evita problemas de serializacion y mantiene los mensajes livianos. El handler se encarga de consultar los datos actualizados desde la base de datos en el momento del procesamiento.

src/MessageHandler/InvoiceGeneratedHandler.phpphp
namespace App\MessageHandler;

use App\Message\InvoiceGenerated;
use App\Service\InvoiceService;
use App\Service\MailerService;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
final readonly class InvoiceGeneratedHandler
{
    public function __construct(
        private InvoiceService $invoiceService,
        private MailerService $mailerService,
    ) {}

    public function __invoke(InvoiceGenerated $message): void
    {
        $pdf = $this->invoiceService->generatePdf($message->orderId);
        $this->mailerService->sendInvoice(
            $message->customerEmail,
            $pdf,
        );
    }
}

Despachar el mensaje desde un controlador o servicio se reduce a una sola linea:

src/Controller/OrderController.phpphp
$this->bus->dispatch(new InvoiceGenerated(
    orderId: $order->getId(),
    customerEmail: $order->getCustomer()->getEmail(),
));

El bus determina si la ejecucion sera sincrona o asincrona segun la configuracion de enrutamiento del transporte.

Configuracion de Transportes y Backends de Cola

Messenger soporta Doctrine DBAL, Redis, Amazon SQS, Beanstalkd, AMQP (RabbitMQ) y el nuevo transporte AMQP streaming presentado en 2025. Cada transporte se define mediante una cadena DSN.

yaml
# config/packages/messenger.yaml
framework:
    messenger:
        failure_transport: failed

        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: high
                retry_strategy:
                    max_retries: 3
                    delay: 1000
                    multiplier: 3
                    max_delay: 60000

            async_priority_low:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: low
                retry_strategy:
                    max_retries: 5
                    delay: 5000
                    multiplier: 2

            failed:
                dsn: 'doctrine://default?queue_name=failed'

        routing:
            'App\Message\InvoiceGenerated': async_priority_high
            'App\Message\CleanupTempFiles': async_priority_low

Separar los transportes por prioridad garantiza que las tareas sensibles al tiempo (generacion de facturas) se procesen antes que las tareas de mantenimiento (limpieza de archivos temporales). Cada transporte cuenta con su propia estrategia de reintento, ajustada a la criticidad e idempotencia de sus mensajes.

Doctrine vs. Redis vs. AMQP

El transporte Doctrine no requiere infraestructura adicional, pero agrega carga a la base de datos. Redis ofrece latencia inferior al milisegundo. AMQP (RabbitMQ) proporciona enrutamiento avanzado, intercambios dead-letter y el nuevo transporte streaming para escenarios de alto rendimiento. La eleccion depende de la infraestructura existente y los requerimientos de throughput.

Gestion de Workers con Supervisor

Los workers consumen mensajes desde los transportes. En produccion, el comando messenger:consume se ejecuta bajo un gestor de procesos como Supervisor o systemd.

bash
# Consume high-priority messages first, then low-priority
php bin/console messenger:consume async_priority_high async_priority_low \
    --memory-limit=128M \
    --time-limit=3600 \
    --limit=500

Las tres banderas de limite previenen fugas de memoria y aseguran que los workers se reinicien periodicamente. Supervisor reinicia el proceso tras cada finalizacion.

ini
; /etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /var/www/app/bin/console messenger:consume async_priority_high async_priority_low --memory-limit=128M --time-limit=3600
user=www-data
numprocs=2
process_name=%(program_name)s_%(process_num)02d
autostart=true
autorestart=true
startsecs=0
stopwaitsecs=30
stdout_logfile=/var/log/messenger-worker.log
stderr_logfile=/var/log/messenger-worker-error.log

Establecer numprocs=2 genera dos workers en paralelo, lo que duplica la capacidad de procesamiento. El valor optimo se ajusta segun los nucleos de CPU del servidor y el tiempo promedio de procesamiento de cada mensaje.

Pipeline de Middleware y CQRS con Multiples Buses

El middleware envuelve cada despacho de mensaje, agregando comportamientos transversales. La pila de middleware integrada se encarga de la validacion, las transacciones Doctrine y el enrutamiento.

yaml
# config/packages/messenger.yaml
framework:
    messenger:
        default_bus: command.bus
        buses:
            command.bus:
                middleware:
                    - validation
                    - doctrine_transaction
            query.bus:
                middleware:
                    - validation
            event.bus:
                default_middleware:
                    allow_no_handlers: true
                middleware:
                    - validation

Esta configuracion implementa CQRS (Command Query Responsibility Segregation). Los comandos mutan el estado dentro de una transaccion Doctrine. Las consultas son de solo lectura. Los eventos permiten cero handlers, habilitando un patron pub/sub donde los listeners se pueden agregar de forma independiente. En una entrevista tecnica, demostrar conocimiento sobre la separacion de buses evidencia una comprension solida de patrones arquitectonicos en aplicaciones de gran escala.

¿Listo para aprobar tus entrevistas de Symfony?

Practica con nuestros simuladores interactivos, flashcards y tests técnicos.

Middleware de Deduplicacion en Symfony 7.3

Los mensajes duplicados desperdician recursos y pueden generar efectos secundarios como cobrar dos veces a un cliente. Symfony 7.3 introduce DeduplicateMiddleware para descartar automaticamente mensajes identicos que ya se encuentren encolados.

src/Message/SendWelcomeEmail.phpphp
namespace App\Message;

use Symfony\Component\Messenger\Stamp\DeduplicateStamp;

final readonly class SendWelcomeEmail
{
    public function __construct(
        public int $userId,
    ) {}
}
php
// Dispatching with deduplication
use Symfony\Component\Messenger\Stamp\DeduplicateStamp;

$this->bus->dispatch(
    new SendWelcomeEmail(userId: 42),
    [new DeduplicateStamp(id: 'welcome-email-42')],
);

El DeduplicateStamp recibe un identificador de recurso de bloqueo. Si ya existe un mensaje pendiente con el mismo ID, el nuevo despacho se descarta silenciosamente. Esta funcionalidad requiere el componente Lock con un almacenamiento serializable (Redis, Memcached o base de datos).

Estrategias de Reintento y Transporte de Fallos

Cuando un handler lanza una excepcion, Messenger reintenta el mensaje de acuerdo con la estrategia de reintento configurada en el transporte. Una vez agotados los reintentos, el mensaje se traslada al transporte de fallos (failure transport).

src/MessageHandler/PaymentHandler.phpphp
namespace App\MessageHandler;

use App\Message\ProcessPayment;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;

#[AsMessageHandler]
final class PaymentHandler
{
    public function __invoke(ProcessPayment $message): void
    {
        try {
            $this->gateway->charge($message->amount, $message->token);
        } catch (GatewayTimeoutException $e) {
            // Recoverable: retry with backoff
            throw new RecoverableMessageHandlingException(
                'Payment gateway timeout, retrying',
                previous: $e,
            );
        } catch (InvalidCardException $e) {
            // Unrecoverable: send to failure transport immediately
            throw new UnrecoverableMessageHandlingException(
                'Invalid card, no retry',
                previous: $e,
            );
        }
    }
}

RecoverableMessageHandlingException activa la estrategia de reintento con backoff exponencial. UnrecoverableMessageHandlingException omite todos los reintentos y envia el mensaje directamente al transporte de fallos. Esta distincion evita desperdiciar reintentos en mensajes que presentan errores permanentes.

bash
# Inspect and manage failed messages
php bin/console messenger:failed:show
php bin/console messenger:failed:show 20 --transport=failed

# Retry specific messages
php bin/console messenger:failed:retry 20 30

# Filter and remove by class (Symfony 7.3+)
php bin/console messenger:failed:remove --class-filter="App\Message\CleanupTempFiles"
Handlers Idempotentes

Los reintentos implican que un handler puede ejecutarse multiples veces para el mismo mensaje. Todo handler debe disenarse como idempotente: verificar si el trabajo ya fue realizado antes de repetirlo. Las restricciones de unicidad en la base de datos o banderas de estado son mecanismos efectivos para prevenir el procesamiento duplicado.

Keepalive en Doctrine y Mensajes de Larga Duracion

Los handlers de mensajes con tiempos de ejecucion prolongados corren el riesgo de que sus mensajes sean reentregados cuando el timeout de visibilidad del transporte expira. Symfony 7.2 introdujo keepalive para Redis, SQS y Beanstalkd. Symfony 7.3 extiende esta funcionalidad al transporte Doctrine.

bash
# Enable keepalive to prevent redelivery during long processing
php bin/console messenger:consume async --keepalive

La bandera --keepalive actualiza periodicamente el timestamp delivered_at en la tabla del transporte Doctrine, senalizando que el worker sigue procesando activamente el mensaje. Sin esta bandera, un mensaje cuyo procesamiento supere el timeout del transporte (5 minutos por defecto) sera recogido por otro worker, causando procesamiento duplicado.

Atributo #[AsMessage] para Enrutamiento Declarativo

Symfony 7.2 introdujo el atributo #[AsMessage], que traslada el enrutamiento de transportes desde la configuracion YAML hacia la propia clase del mensaje.

src/Message/GenerateReport.phpphp
namespace App\Message;

use Symfony\Component\Messenger\Attribute\AsMessage;

#[AsMessage(transport: 'async_priority_low')]
final readonly class GenerateReport
{
    public function __construct(
        public int $reportId,
        public string $format = 'pdf',
    ) {}
}

Esto elimina la seccion routing en messenger.yaml para este mensaje. El transporte queda declarado en el codigo fuente, lo que hace el codebase autodocumentado. Ambos enfoques (enrutamiento YAML y atributo) coexisten, y el atributo tiene precedencia.

Transporte AMQP Streaming para Alto Rendimiento

El transporte AMQP tradicional utiliza polling (get()) para obtener mensajes, generando carga innecesaria sobre RabbitMQ. El transporte AMQP streaming publicado en 2025 cambia a un modelo push (consume()), reduciendo la latencia y el consumo de recursos.

yaml
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            streaming:
                dsn: 'amqp-lib://guest:guest@localhost:5672/%2f/messages'
                options:
                    exchange:
                        name: app_events
                        type: topic
                    queues:
                        order_events:
                            binding_keys: ['order.*']

Diferencias clave respecto al transporte AMQP por defecto: no requiere la extension C (utiliza php-amqplib), realiza la entrega por streaming sobre conexiones TCP de larga duracion, y soporta nativamente intercambios topic con enrutamiento por binding keys. Este transporte procesa miles de mensajes por segundo con una sobrecarga minima de CPU.

Conclusion

  • Pasar IDs escalares en los mensajes, nunca entidades Doctrine; obtener los datos actualizados en el handler para evitar problemas de serializacion y estado obsoleto
  • Separar los transportes por prioridad y criticidad; cada transporte recibe su propia estrategia de reintento y pool de workers dedicado
  • Utilizar RecoverableMessageHandlingException y UnrecoverableMessageHandlingException para controlar el comportamiento de reintento de forma explicita
  • Activar --keepalive en workers con transporte Doctrine para prevenir la reentrega de mensajes de larga duracion
  • Aplicar DeduplicateStamp en mensajes donde el procesamiento duplicado genera efectos secundarios (pagos, correos, notificaciones)
  • Implementar CQRS con multiples buses: bus de comandos para mutaciones con transacciones Doctrine, bus de consultas para lecturas, bus de eventos para pub/sub
  • Usar Supervisor o systemd en produccion con las banderas --memory-limit, --time-limit y --limit para la gestion del ciclo de vida de los workers
  • Considerar el transporte AMQP streaming para escenarios de alto rendimiento que requieran latencia inferior al milisegundo

¡Empieza a practicar!

Pon a prueba tu conocimiento con nuestros simuladores de entrevista y tests técnicos.

Etiquetas

#symfony
#messenger
#php
#async
#message queue
#workers

Compartir

Artículos relacionados