Spring Kafka: arquitectura event-driven con consumidores resilientes

Guía completa de Spring Kafka para arquitecturas event-driven. Configuración, consumidores resilientes, políticas de retry, dead letter queues y patrones de producción para aplicaciones distribuidas.

Arquitectura event-driven con Spring Kafka y consumidores resilientes

Apache Kafka se ha convertido en el estándar de facto para arquitecturas event-driven a gran escala. Spring Kafka simplifica su integración en aplicaciones Spring Boot al tiempo que proporciona mecanismos de resiliencia esenciales para entornos de producción. Esta guía explora en profundidad la configuración, los patrones de consumo y las estrategias de manejo de errores.

Requisitos previos

Esta guía asume familiaridad con los conceptos básicos de Kafka: topics, particiones, consumer groups y offsets. El enfoque está en la integración con Spring y los patrones de resiliencia.

¿Por qué elegir una arquitectura event-driven?

Las arquitecturas event-driven desacoplan los componentes de un sistema mediante eventos asíncronos. A diferencia de las llamadas síncronas REST, los productores emiten eventos sin esperar respuesta, permitiendo que los consumidores procesen a su propio ritmo.

Este enfoque aporta varios beneficios críticos: escalabilidad horizontal independiente por servicio, mayor resiliencia frente a fallos temporales y trazabilidad completa a través del log inmutable de Kafka.

OrderEvent.javajava
public record OrderEvent(
    // Identificador único del evento para idempotencia
    String eventId,
    // Tipo de evento para enrutamiento
    String eventType,
    // Timestamp de creación
    Instant createdAt,
    // Payload de negocio
    OrderPayload payload
) {
    // Factory method que garantiza la unicidad del eventId
    public static OrderEvent created(OrderPayload payload) {
        return new OrderEvent(
            UUID.randomUUID().toString(),
            "ORDER_CREATED",
            Instant.now(),
            payload
        );
    }
}

public record OrderPayload(
    Long orderId,
    Long customerId,
    List<OrderItem> items,
    BigDecimal totalAmount
) {}

La estructura del evento incluye sistemáticamente un identificador único, un tipo y un timestamp. Estos metadatos permiten el filtrado en el consumidor, la detección de duplicados y el seguimiento temporal.

Configuración básica de Spring Kafka

La integración comienza con el starter spring-kafka y una configuración YAML mínima. Spring Boot autoconfigura los beans esenciales: KafkaTemplate para producir y ConcurrentKafkaListenerContainerFactory para consumir.

yaml
# application.yml
spring:
  kafka:
    # Direcciones de los brokers Kafka (cluster)
    bootstrap-servers: localhost:9092

    # Configuración del productor
    producer:
      # Serialización de claves String
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Serialización de valores JSON
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # Esperar acuse de todas las réplicas
      acks: all
      # Número de reintentos ante fallo de red
      retries: 3

    # Configuración del consumidor
    consumer:
      # Identificador del consumer group
      group-id: order-service
      # Deserialización de claves
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Deserialización JSON con tipo de destino
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # Posición inicial si no hay offset registrado
      auto-offset-reset: earliest
      # Desactivar auto-commit para control manual
      enable-auto-commit: false
      properties:
        # Paquete de confianza para la deserialización
        spring.json.trusted.packages: com.example.events

Desactivar enable-auto-commit es una práctica esencial en producción. Los commits manuales de offset garantizan que un mensaje se marque como procesado únicamente tras finalizar el procesamiento real.

Crear un productor Kafka

El KafkaTemplate encapsula la lógica de envío a Kafka. Su inyección directa permite un uso inmediato dentro de los servicios de negocio.

OrderEventPublisher.javajava
@Service
@Slf4j
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    // Topic de destino (externalizado en la configuración)
    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
            OrderPayload payload) {

        // Creación del evento con metadatos
        OrderEvent event = OrderEvent.created(payload);

        // Uso del orderId como clave de partición
        // Garantiza el orden de los eventos para un mismo pedido
        String partitionKey = payload.orderId().toString();

        log.info("Publishing ORDER_CREATED event: {} to topic: {}",
            event.eventId(), ordersTopic);

        // Envío asíncrono con callback
        return kafkaTemplate.send(ordersTopic, partitionKey, event)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    // Éxito: registrar metadatos del envío
                    RecordMetadata metadata = result.getRecordMetadata();
                    log.info("Event sent successfully: topic={}, partition={}, offset={}",
                        metadata.topic(), metadata.partition(), metadata.offset());
                } else {
                    // Fallo: registrar el error para investigación
                    log.error("Failed to send event: {}", event.eventId(), ex);
                }
            });
    }
}

Usar una clave de partición basada en el identificador de negocio asegura que todos los eventos relativos a la misma entidad caigan en la misma partición, preservando así su orden cronológico.

Clave de partición

Una clave nula distribuye los mensajes en round-robin entre las particiones. Esto maximiza el paralelismo pero pierde las garantías de orden. La elección de la clave depende de los requisitos de negocio.

Consumidor básico con @KafkaListener

La anotación @KafkaListener transforma un método en un consumidor Kafka. Spring gestiona automáticamente el bucle de polling, la deserialización y el commit de offsets.

OrderEventConsumer.javajava
@Service
@Slf4j
public class OrderEventConsumer {

    private final OrderProcessingService processingService;

    public OrderEventConsumer(OrderProcessingService processingService) {
        this.processingService = processingService;
    }

    @KafkaListener(
        // Topic(s) a escuchar
        topics = "${app.kafka.topics.orders}",
        // Consumer group
        groupId = "${spring.kafka.consumer.group-id}",
        // Factory personalizada para configuración avanzada
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            // Payload deserializado automáticamente
            OrderEvent event,
            // Metadatos Kafka inyectados
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            // Acknowledgment para commit manual
            Acknowledgment acknowledgment) {

        log.info("Received event: type={}, partition={}, offset={}",
            event.eventType(), partition, offset);

        try {
            // Procesamiento de negocio
            processingService.process(event);

            // Commit del offset solo tras procesamiento exitoso
            acknowledgment.acknowledge();

            log.info("Event processed successfully: {}", event.eventId());

        } catch (Exception ex) {
            // La ausencia de acknowledge() provoca el reprocesamiento
            log.error("Failed to process event: {}", event.eventId(), ex);
            throw ex;
        }
    }
}

Inyectar Acknowledgment permite controlar explícitamente el commit. Sin la llamada a acknowledge(), el offset permanece sin confirmar y el mensaje se redistribuirá en el siguiente poll.

Configuración avanzada del ConsumerFactory

La configuración por defecto resulta adecuada para desarrollo pero requiere ajustes para producción. Una factory personalizada ofrece un control fino sobre el comportamiento del consumidor.

KafkaConsumerConfig.javajava
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        // Configuración de conexión
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // Deserialización
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);

        // Gestión de offsets
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // Rendimiento: registros por poll
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        // Timeout de sesión para detección de fallos
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        // Intervalo de heartbeat (1/3 del timeout de sesión)
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

        // Tiempo máximo de procesamiento antes del rebalance
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

        // Configuración del deserializador JSON
        JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
        deserializer.addTrustedPackages("com.example.events");
        deserializer.setUseTypeMapperForKey(false);

        return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        // Modo de acknowledgment manual
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Número de hilos consumidores
        factory.setConcurrency(3);

        // Procesamiento por lotes desactivado (un mensaje a la vez)
        factory.setBatchListener(false);

        return factory;
    }
}

El parámetro MAX_POLL_INTERVAL_MS_CONFIG define el retardo máximo entre llamadas poll(). Excederlo provoca la expulsión del consumidor del grupo y un rebalance. Este valor debe reflejar el tiempo máximo de procesamiento esperado.

¿Listo para aprobar tus entrevistas de Spring Boot?

Practica con nuestros simuladores interactivos, flashcards y tests técnicos.

Estrategias de reintento con RetryTemplate

Los errores transitorios (indisponibilidad temporal de un servicio, timeouts de red) requieren reintentos automáticos. Spring Kafka se integra con RetryTemplate para implementar políticas de reintento sofisticadas.

KafkaRetryConfig.javajava
@Configuration
public class KafkaRetryConfig {

    @Bean
    public RetryTemplate kafkaRetryTemplate() {
        RetryTemplate template = new RetryTemplate();

        // Política de reintento: 3 intentos máximos
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        template.setRetryPolicy(retryPolicy);

        // Backoff exponencial: 1s, 2s, 4s entre intentos
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        template.setBackOffPolicy(backOffPolicy);

        return template;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            retryableKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                RetryTemplate kafkaRetryTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Configuración de reintento con callback de recuperación
        factory.setCommonErrorHandler(
            new DefaultErrorHandler(
                // Recuperación: acción tras agotar los reintentos
                (record, exception) -> {
                    log.error("All retries exhausted for record: key={}, value={}",
                        record.key(), record.value(), exception);
                },
                // Backoff exponencial: 1s inicial, máx. 30s, 3 intentos
                new ExponentialBackOff(1000L, 2.0)
            )
        );

        return factory;
    }
}

El DefaultErrorHandler reemplaza al antiguo SeekToCurrentErrorHandler desde Spring Kafka 2.8. Ofrece una API más clara y opciones de configuración ampliadas.

Implementar una Dead Letter Queue

Tras agotar los reintentos, los mensajes fallidos deben enrutarse a una Dead Letter Queue (DLQ) para análisis posterior. Este enfoque evita la pérdida de datos al tiempo que desbloquea al consumidor.

DeadLetterConfig.javajava
@Configuration
public class DeadLetterConfig {

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // Estrategia de nomenclatura del topic DLT: original-topic.DLT
        return new DeadLetterPublishingRecoverer(
            kafkaTemplate,
            (record, exception) -> {
                // Topic DLT basado en el topic origen
                String dltTopic = record.topic() + ".DLT";

                log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
                    dltTopic, record.key(), exception.getMessage());

                return new TopicPartition(dltTopic, record.partition());
            }
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            dltKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // ErrorHandler con recuperación DLT
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            deadLetterPublishingRecoverer,
            new FixedBackOff(1000L, 3)
        );

        // Excepciones no reintentables (envío directo a DLT)
        errorHandler.addNotRetryableExceptions(
            ValidationException.class,
            JsonParseException.class,
            NullPointerException.class
        );

        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }
}

Distinguir entre excepciones reintentables y no reintentables optimiza el comportamiento. Una ValidationException indica datos malformados que no se corregirán con reintentos, justificando el enrutamiento directo al DLT.

Consumidor DLT para reprocesamiento manual

Un consumidor dedicado vigila el DLT y permite reprocesar mensajes tras corregir el problema subyacente.

DeadLetterConsumer.javajava
@Service
@Slf4j
public class DeadLetterConsumer {

    private final AlertingService alertingService;
    private final FailedEventRepository failedEventRepository;

    public DeadLetterConsumer(AlertingService alertingService,
                              FailedEventRepository failedEventRepository) {
        this.alertingService = alertingService;
        this.failedEventRepository = failedEventRepository;
    }

    @KafkaListener(
        topics = "${app.kafka.topics.orders}.DLT",
        groupId = "order-service-dlt"
    )
    public void handleDeadLetter(
            ConsumerRecord<String, OrderEvent> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
            @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
            @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
            Acknowledgment acknowledgment) {

        log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
            originalTopic, originalPartition, originalOffset, exceptionMessage);

        // Persistencia para análisis y reprocesamiento posterior
        FailedEvent failedEvent = FailedEvent.builder()
            .eventId(record.value().eventId())
            .eventType(record.value().eventType())
            .payload(serializePayload(record.value()))
            .originalTopic(originalTopic)
            .originalPartition(originalPartition)
            .originalOffset(originalOffset)
            .exceptionMessage(exceptionMessage)
            .stacktrace(stacktrace)
            .status(FailedEventStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        failedEventRepository.save(failedEvent);

        // Alerta para intervención humana
        alertingService.notifyDeadLetter(failedEvent);

        acknowledgment.acknowledge();
    }
}
Cabeceras DLT

Spring Kafka enriquece automáticamente los mensajes DLT con cabeceras que contienen metadatos del fallo: excepción, stacktrace, topic original, partición y offset. Esta información facilita el diagnóstico.

Gestión de la idempotencia en el consumidor

Kafka garantiza una entrega "at least once": un mensaje puede entregarse varias veces si ocurre un crash tras el procesamiento pero antes del commit. La idempotencia en el consumidor evita los efectos secundarios del reprocesamiento.

IdempotentOrderProcessor.javajava
@Service
@Slf4j
public class IdempotentOrderProcessor {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderService orderService;

    public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
                                    OrderService orderService) {
        this.processedEventRepository = processedEventRepository;
        this.orderService = orderService;
    }

    @Transactional
    public void processIdempotent(OrderEvent event) {
        String eventId = event.eventId();

        // Verificación: ¿evento ya procesado?
        if (processedEventRepository.existsByEventId(eventId)) {
            log.info("Event already processed, skipping: {}", eventId);
            return;
        }

        // Procesamiento de negocio
        switch (event.eventType()) {
            case "ORDER_CREATED" -> orderService.createOrder(event.payload());
            case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
            case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
            default -> log.warn("Unknown event type: {}", event.eventType());
        }

        // Registrar el procesamiento en la misma transacción
        ProcessedEvent processed = ProcessedEvent.builder()
            .eventId(eventId)
            .eventType(event.eventType())
            .processedAt(Instant.now())
            .build();

        processedEventRepository.save(processed);

        log.info("Event processed and recorded: {}", eventId);
    }
}
ProcessedEventRepository.javajava
@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {

    // Índice sobre eventId para rendimiento óptimo
    boolean existsByEventId(String eventId);

    // Limpieza de registros antiguos
    @Modifying
    @Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
    int deleteOlderThan(@Param("before") Instant before);
}

Registrar el procesamiento en la misma transacción que la lógica de negocio garantiza atomicidad. Un crash tras el procesamiento pero antes del commit reproducirá ambas operaciones de forma consistente.

Patrón Outbox transaccional

El patrón Outbox resuelve los problemas de consistencia entre la base de datos y Kafka. En lugar de publicar directamente en Kafka, los eventos se persisten primero en una tabla "outbox" y luego son retransmitidos por un proceso dedicado.

OutboxEvent.javajava
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String eventId;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String eventType;

    @Column(columnDefinition = "TEXT", nullable = false)
    private String payload;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private OutboxStatus status;

    @Column(nullable = false)
    private Instant createdAt;

    private Instant publishedAt;
}

public enum OutboxStatus {
    PENDING, PUBLISHED, FAILED
}
OrderService.javajava
@Service
@Transactional
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    public OrderService(OrderRepository orderRepository,
                       OutboxRepository outboxRepository,
                       ObjectMapper objectMapper) {
        this.orderRepository = orderRepository;
        this.outboxRepository = outboxRepository;
        this.objectMapper = objectMapper;
    }

    public Order createOrder(CreateOrderRequest request) {
        // Crear el pedido
        Order order = Order.builder()
            .customerId(request.customerId())
            .items(request.items())
            .totalAmount(calculateTotal(request.items()))
            .status(OrderStatus.CREATED)
            .createdAt(Instant.now())
            .build();

        Order savedOrder = orderRepository.save(order);

        // Crear el evento outbox en la misma transacción
        OrderEvent event = OrderEvent.created(toPayload(savedOrder));

        OutboxEvent outboxEvent = OutboxEvent.builder()
            .eventId(event.eventId())
            .aggregateType("Order")
            .aggregateId(savedOrder.getId().toString())
            .eventType(event.eventType())
            .payload(serialize(event))
            .status(OutboxStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        outboxRepository.save(outboxEvent);

        return savedOrder;
    }

    private String serialize(OrderEvent event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Failed to serialize event", e);
        }
    }
}

El relayer outbox publica los eventos en Kafka de manera asíncrona:

OutboxRelayScheduler.javajava
@Component
@Slf4j
public class OutboxRelayScheduler {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OutboxRelayScheduler(OutboxRepository outboxRepository,
                               KafkaTemplate<String, String> kafkaTemplate) {
        this.outboxRepository = outboxRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void relayPendingEvents() {
        // Recuperar eventos pendientes con bloqueo
        List<OutboxEvent> pendingEvents = outboxRepository
            .findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);

        for (OutboxEvent event : pendingEvents) {
            try {
                // Publicar en Kafka
                kafkaTemplate.send(
                    ordersTopic,
                    event.getAggregateId(),
                    event.getPayload()
                ).get(5, TimeUnit.SECONDS);

                // Actualizar estado
                event.setStatus(OutboxStatus.PUBLISHED);
                event.setPublishedAt(Instant.now());
                outboxRepository.save(event);

                log.debug("Outbox event published: {}", event.getEventId());

            } catch (Exception ex) {
                log.error("Failed to publish outbox event: {}",
                    event.getEventId(), ex);
                event.setStatus(OutboxStatus.FAILED);
                outboxRepository.save(event);
            }
        }
    }
}

Este patrón asegura que un evento solo se publique si la transacción de negocio tuvo éxito, eliminando inconsistencias entre la base de datos y Kafka.

¡Empieza a practicar!

Pon a prueba tu conocimiento con nuestros simuladores de entrevista y tests técnicos.

Monitorización y observabilidad

Supervisar un sistema Kafka requiere métricas sobre la latencia de consumo, el lag y los errores. Spring Boot Actuator expone estas métricas mediante Micrometer.

KafkaMetricsConfig.javajava
@Configuration
public class KafkaMetricsConfig {

    @Bean
    public MeterBinder kafkaConsumerMetrics(
            ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        return registry -> {
            // Métrica personalizada: eventos procesados
            Counter.builder("kafka.consumer.events.processed")
                .description("Number of events successfully processed")
                .tag("topic", "orders")
                .register(registry);

            // Métrica personalizada: eventos fallidos
            Counter.builder("kafka.consumer.events.failed")
                .description("Number of events that failed processing")
                .tag("topic", "orders")
                .register(registry);
        };
    }
}
InstrumentedOrderConsumer.javajava
@Service
@Slf4j
public class InstrumentedOrderConsumer {

    private final OrderProcessingService processingService;
    private final MeterRegistry meterRegistry;
    private final Counter processedCounter;
    private final Counter failedCounter;
    private final Timer processingTimer;

    public InstrumentedOrderConsumer(OrderProcessingService processingService,
                                     MeterRegistry meterRegistry) {
        this.processingService = processingService;
        this.meterRegistry = meterRegistry;

        // Inicializar contadores
        this.processedCounter = Counter.builder("kafka.consumer.events.processed")
            .tag("topic", "orders")
            .register(meterRegistry);

        this.failedCounter = Counter.builder("kafka.consumer.events.failed")
            .tag("topic", "orders")
            .register(meterRegistry);

        // Timer para medir la latencia de procesamiento
        this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
            .tag("topic", "orders")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(meterRegistry);
    }

    @KafkaListener(topics = "${app.kafka.topics.orders}")
    public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {

        Timer.Sample sample = Timer.start(meterRegistry);

        try {
            processingService.process(event);
            acknowledgment.acknowledge();

            processedCounter.increment();
            sample.stop(processingTimer);

        } catch (Exception ex) {
            failedCounter.increment();
            sample.stop(processingTimer);
            throw ex;
        }
    }
}

Estas métricas permiten configurar alertas sobre el lag del consumidor, la tasa de errores y la latencia de procesamiento, esenciales para la detección proactiva de problemas.

Conclusión

Spring Kafka aporta una integración robusta para construir arquitecturas event-driven resilientes. Dominar los mecanismos de reintento, las dead letter queues y la idempotencia constituye la base de aplicaciones listas para producción.

Checklist de arquitectura event-driven con Spring Kafka:

  • ✅ Configurar commits manuales de offset con AckMode.MANUAL
  • ✅ Usar una clave de partición consistente para preservar el orden de los eventos
  • ✅ Implementar reintentos con backoff exponencial mediante DefaultErrorHandler
  • ✅ Enrutar los mensajes fallidos hacia una Dead Letter Queue
  • ✅ Garantizar la idempotencia en el consumidor con seguimiento de eventId
  • ✅ Considerar el patrón Outbox para la consistencia base de datos/Kafka
  • ✅ Exponer métricas mediante Micrometer para la monitorización
  • ✅ Ajustar MAX_POLL_INTERVAL_MS según el tiempo máximo de procesamiento

Etiquetas

#spring kafka
#event-driven
#kafka consumer
#microservices
#resilience

Compartir

Artículos relacionados