Spring Kafka: arquitetura orientada a eventos com consumidores resilientes

Guia completo do Spring Kafka para arquiteturas orientadas a eventos. Configuração, consumidores resilientes, políticas de retry, dead letter queues e padrões de produção para aplicações distribuídas.

Arquitetura orientada a eventos com Spring Kafka e consumidores resilientes

Apache Kafka tornou-se o padrão de facto para arquiteturas orientadas a eventos em larga escala. O Spring Kafka simplifica a sua integração em aplicações Spring Boot ao mesmo tempo que oferece mecanismos de resiliência essenciais para ambientes de produção. Este guia explora em profundidade a configuração, os padrões de consumo e as estratégias de tratamento de erros.

Pré-requisitos

Este guia pressupõe familiaridade com os conceitos básicos do Kafka: tópicos, partições, consumer groups e offsets. O foco está na integração com o Spring e nos padrões de resiliência.

Por que escolher uma arquitetura orientada a eventos?

As arquiteturas orientadas a eventos desacoplam os componentes de um sistema através de eventos assíncronos. Diferentemente das chamadas síncronas REST, os produtores emitem eventos sem aguardar resposta, permitindo que os consumidores processem no seu próprio ritmo.

Essa abordagem traz vários benefícios críticos: escalabilidade horizontal independente por serviço, maior resiliência diante de falhas temporárias e rastreabilidade completa via log imutável do Kafka.

OrderEvent.javajava
public record OrderEvent(
    // Identificador único do evento para idempotência
    String eventId,
    // Tipo de evento para roteamento
    String eventType,
    // Timestamp de criação
    Instant createdAt,
    // Payload de negócio
    OrderPayload payload
) {
    // Factory method que garante a unicidade do eventId
    public static OrderEvent created(OrderPayload payload) {
        return new OrderEvent(
            UUID.randomUUID().toString(),
            "ORDER_CREATED",
            Instant.now(),
            payload
        );
    }
}

public record OrderPayload(
    Long orderId,
    Long customerId,
    List<OrderItem> items,
    BigDecimal totalAmount
) {}

A estrutura do evento inclui sistematicamente um identificador único, um tipo e um timestamp. Esses metadados permitem filtragem no consumidor, detecção de duplicatas e rastreamento temporal.

Configuração básica do Spring Kafka

A integração começa com o starter spring-kafka e uma configuração YAML mínima. O Spring Boot configura automaticamente os beans essenciais: KafkaTemplate para produção e ConcurrentKafkaListenerContainerFactory para consumo.

yaml
# application.yml
spring:
  kafka:
    # Endereços dos brokers Kafka (cluster)
    bootstrap-servers: localhost:9092

    # Configuração do produtor
    producer:
      # Serialização de chaves String
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Serialização de valores JSON
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # Aguardar confirmação de todas as réplicas
      acks: all
      # Número de tentativas em caso de falha de rede
      retries: 3

    # Configuração do consumidor
    consumer:
      # Identificador do consumer group
      group-id: order-service
      # Desserialização de chaves
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Desserialização JSON com tipo de destino
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # Posição inicial se não houver offset registrado
      auto-offset-reset: earliest
      # Desabilitar auto-commit para controle manual
      enable-auto-commit: false
      properties:
        # Pacote confiável para desserialização
        spring.json.trusted.packages: com.example.events

Desabilitar enable-auto-commit é uma prática essencial em produção. Os commits manuais de offset garantem que uma mensagem só seja marcada como processada após o término efetivo do processamento.

Criar um produtor Kafka

O KafkaTemplate encapsula a lógica de envio para o Kafka. Sua injeção direta permite o uso imediato dentro dos serviços de negócio.

OrderEventPublisher.javajava
@Service
@Slf4j
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    // Tópico de destino (externalizado na configuração)
    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
            OrderPayload payload) {

        // Criação do evento com metadados
        OrderEvent event = OrderEvent.created(payload);

        // Uso do orderId como chave de partição
        // Garante a ordem dos eventos para um mesmo pedido
        String partitionKey = payload.orderId().toString();

        log.info("Publishing ORDER_CREATED event: {} to topic: {}",
            event.eventId(), ordersTopic);

        // Envio assíncrono com callback
        return kafkaTemplate.send(ordersTopic, partitionKey, event)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    // Sucesso: registrar metadados do envio
                    RecordMetadata metadata = result.getRecordMetadata();
                    log.info("Event sent successfully: topic={}, partition={}, offset={}",
                        metadata.topic(), metadata.partition(), metadata.offset());
                } else {
                    // Falha: registrar o erro para investigação
                    log.error("Failed to send event: {}", event.eventId(), ex);
                }
            });
    }
}

Usar uma chave de partição baseada no identificador de negócio garante que todos os eventos referentes à mesma entidade caiam na mesma partição, preservando assim a ordem cronológica.

Chave de partição

Uma chave nula distribui as mensagens em round-robin entre as partições. Isso maximiza o paralelismo, mas perde as garantias de ordem. A escolha da chave depende dos requisitos de negócio.

Consumidor básico com @KafkaListener

A anotação @KafkaListener transforma um método em consumidor Kafka. O Spring gerencia automaticamente o loop de polling, a desserialização e o commit de offsets.

OrderEventConsumer.javajava
@Service
@Slf4j
public class OrderEventConsumer {

    private final OrderProcessingService processingService;

    public OrderEventConsumer(OrderProcessingService processingService) {
        this.processingService = processingService;
    }

    @KafkaListener(
        // Tópico(s) a escutar
        topics = "${app.kafka.topics.orders}",
        // Consumer group
        groupId = "${spring.kafka.consumer.group-id}",
        // Factory personalizada para configuração avançada
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            // Payload desserializado automaticamente
            OrderEvent event,
            // Metadados Kafka injetados
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            // Acknowledgment para commit manual
            Acknowledgment acknowledgment) {

        log.info("Received event: type={}, partition={}, offset={}",
            event.eventType(), partition, offset);

        try {
            // Processamento de negócio
            processingService.process(event);

            // Commit do offset apenas após processamento bem-sucedido
            acknowledgment.acknowledge();

            log.info("Event processed successfully: {}", event.eventId());

        } catch (Exception ex) {
            // A ausência de acknowledge() provoca reprocessamento
            log.error("Failed to process event: {}", event.eventId(), ex);
            throw ex;
        }
    }
}

Injetar Acknowledgment permite controlar explicitamente o commit. Sem a chamada a acknowledge(), o offset permanece não confirmado e a mensagem será reentregue no próximo poll.

Configuração avançada do ConsumerFactory

A configuração padrão atende ao desenvolvimento, mas exige ajustes para produção. Uma factory personalizada oferece controle refinado sobre o comportamento do consumidor.

KafkaConsumerConfig.javajava
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        // Configuração de conexão
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // Desserialização
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);

        // Gerenciamento de offsets
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // Desempenho: registros por poll
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        // Timeout de sessão para detecção de falhas
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        // Intervalo de heartbeat (1/3 do timeout de sessão)
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

        // Tempo máximo de processamento antes do rebalance
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

        // Configuração do desserializador JSON
        JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
        deserializer.addTrustedPackages("com.example.events");
        deserializer.setUseTypeMapperForKey(false);

        return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        // Modo de acknowledgment manual
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Número de threads consumidoras
        factory.setConcurrency(3);

        // Processamento em lote desabilitado (uma mensagem por vez)
        factory.setBatchListener(false);

        return factory;
    }
}

O parâmetro MAX_POLL_INTERVAL_MS_CONFIG define o atraso máximo entre chamadas poll(). Excedê-lo provoca a expulsão do consumidor do grupo e um rebalance. Esse valor deve refletir o tempo máximo de processamento esperado.

Pronto para mandar bem nas entrevistas de Spring Boot?

Pratique com nossos simuladores interativos, flashcards e testes tecnicos.

Estratégias de retry com RetryTemplate

Erros transitórios (indisponibilidade temporária de um serviço, timeouts de rede) exigem tentativas automáticas. O Spring Kafka integra-se com RetryTemplate para implementar políticas de retry sofisticadas.

KafkaRetryConfig.javajava
@Configuration
public class KafkaRetryConfig {

    @Bean
    public RetryTemplate kafkaRetryTemplate() {
        RetryTemplate template = new RetryTemplate();

        // Política de retry: 3 tentativas máximas
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        template.setRetryPolicy(retryPolicy);

        // Backoff exponencial: 1s, 2s, 4s entre tentativas
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        template.setBackOffPolicy(backOffPolicy);

        return template;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            retryableKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                RetryTemplate kafkaRetryTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Configuração de retry com callback de recuperação
        factory.setCommonErrorHandler(
            new DefaultErrorHandler(
                // Recuperação: ação após esgotadas as tentativas
                (record, exception) -> {
                    log.error("All retries exhausted for record: key={}, value={}",
                        record.key(), record.value(), exception);
                },
                // Backoff exponencial: 1s inicial, máx. 30s, 3 tentativas
                new ExponentialBackOff(1000L, 2.0)
            )
        );

        return factory;
    }
}

O DefaultErrorHandler substitui o antigo SeekToCurrentErrorHandler desde o Spring Kafka 2.8. Oferece uma API mais clara e opções de configuração ampliadas.

Implementar uma Dead Letter Queue

Após o esgotamento das tentativas, as mensagens com falha devem ser roteadas para uma Dead Letter Queue (DLQ) para análise posterior. Essa abordagem evita perda de dados ao mesmo tempo em que desbloqueia o consumidor.

DeadLetterConfig.javajava
@Configuration
public class DeadLetterConfig {

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // Estratégia de nomenclatura do tópico DLT: original-topic.DLT
        return new DeadLetterPublishingRecoverer(
            kafkaTemplate,
            (record, exception) -> {
                // Tópico DLT baseado no tópico de origem
                String dltTopic = record.topic() + ".DLT";

                log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
                    dltTopic, record.key(), exception.getMessage());

                return new TopicPartition(dltTopic, record.partition());
            }
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            dltKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // ErrorHandler com recuperação DLT
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            deadLetterPublishingRecoverer,
            new FixedBackOff(1000L, 3)
        );

        // Exceções não retentáveis (envio direto para DLT)
        errorHandler.addNotRetryableExceptions(
            ValidationException.class,
            JsonParseException.class,
            NullPointerException.class
        );

        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }
}

Distinguir entre exceções retentáveis e não retentáveis otimiza o comportamento. Uma ValidationException indica dados malformados que não serão corrigidos por novas tentativas, justificando o roteamento direto para a DLT.

Consumidor DLT para reprocessamento manual

Um consumidor dedicado monitora a DLT e permite o reprocessamento de mensagens após a correção do problema subjacente.

DeadLetterConsumer.javajava
@Service
@Slf4j
public class DeadLetterConsumer {

    private final AlertingService alertingService;
    private final FailedEventRepository failedEventRepository;

    public DeadLetterConsumer(AlertingService alertingService,
                              FailedEventRepository failedEventRepository) {
        this.alertingService = alertingService;
        this.failedEventRepository = failedEventRepository;
    }

    @KafkaListener(
        topics = "${app.kafka.topics.orders}.DLT",
        groupId = "order-service-dlt"
    )
    public void handleDeadLetter(
            ConsumerRecord<String, OrderEvent> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
            @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
            @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
            Acknowledgment acknowledgment) {

        log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
            originalTopic, originalPartition, originalOffset, exceptionMessage);

        // Persistência para análise e reprocessamento posterior
        FailedEvent failedEvent = FailedEvent.builder()
            .eventId(record.value().eventId())
            .eventType(record.value().eventType())
            .payload(serializePayload(record.value()))
            .originalTopic(originalTopic)
            .originalPartition(originalPartition)
            .originalOffset(originalOffset)
            .exceptionMessage(exceptionMessage)
            .stacktrace(stacktrace)
            .status(FailedEventStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        failedEventRepository.save(failedEvent);

        // Alerta para intervenção humana
        alertingService.notifyDeadLetter(failedEvent);

        acknowledgment.acknowledge();
    }
}
Cabeçalhos DLT

O Spring Kafka enriquece automaticamente as mensagens DLT com cabeçalhos contendo metadados da falha: exceção, stacktrace, tópico original, partição e offset. Essas informações facilitam o diagnóstico.

Tratamento da idempotência no consumidor

O Kafka garante entrega "at least once": uma mensagem pode ser entregue várias vezes se ocorrer um crash após o processamento, mas antes do commit. A idempotência no consumidor evita os efeitos colaterais do reprocessamento.

IdempotentOrderProcessor.javajava
@Service
@Slf4j
public class IdempotentOrderProcessor {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderService orderService;

    public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
                                    OrderService orderService) {
        this.processedEventRepository = processedEventRepository;
        this.orderService = orderService;
    }

    @Transactional
    public void processIdempotent(OrderEvent event) {
        String eventId = event.eventId();

        // Verificação: evento já processado?
        if (processedEventRepository.existsByEventId(eventId)) {
            log.info("Event already processed, skipping: {}", eventId);
            return;
        }

        // Processamento de negócio
        switch (event.eventType()) {
            case "ORDER_CREATED" -> orderService.createOrder(event.payload());
            case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
            case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
            default -> log.warn("Unknown event type: {}", event.eventType());
        }

        // Registrar o processamento na mesma transação
        ProcessedEvent processed = ProcessedEvent.builder()
            .eventId(eventId)
            .eventType(event.eventType())
            .processedAt(Instant.now())
            .build();

        processedEventRepository.save(processed);

        log.info("Event processed and recorded: {}", eventId);
    }
}
ProcessedEventRepository.javajava
@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {

    // Índice em eventId para desempenho ideal
    boolean existsByEventId(String eventId);

    // Limpeza de registros antigos
    @Modifying
    @Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
    int deleteOlderThan(@Param("before") Instant before);
}

Registrar o processamento na mesma transação que a lógica de negócio garante atomicidade. Um crash após o processamento, mas antes do commit, replicará as duas operações de forma consistente.

Padrão Outbox transacional

O padrão Outbox resolve os problemas de consistência entre o banco de dados e o Kafka. Em vez de publicar diretamente no Kafka, os eventos são primeiro persistidos numa tabela "outbox" e depois retransmitidos por um processo dedicado.

OutboxEvent.javajava
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String eventId;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String eventType;

    @Column(columnDefinition = "TEXT", nullable = false)
    private String payload;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private OutboxStatus status;

    @Column(nullable = false)
    private Instant createdAt;

    private Instant publishedAt;
}

public enum OutboxStatus {
    PENDING, PUBLISHED, FAILED
}
OrderService.javajava
@Service
@Transactional
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    public OrderService(OrderRepository orderRepository,
                       OutboxRepository outboxRepository,
                       ObjectMapper objectMapper) {
        this.orderRepository = orderRepository;
        this.outboxRepository = outboxRepository;
        this.objectMapper = objectMapper;
    }

    public Order createOrder(CreateOrderRequest request) {
        // Criar o pedido
        Order order = Order.builder()
            .customerId(request.customerId())
            .items(request.items())
            .totalAmount(calculateTotal(request.items()))
            .status(OrderStatus.CREATED)
            .createdAt(Instant.now())
            .build();

        Order savedOrder = orderRepository.save(order);

        // Criar o evento outbox na mesma transação
        OrderEvent event = OrderEvent.created(toPayload(savedOrder));

        OutboxEvent outboxEvent = OutboxEvent.builder()
            .eventId(event.eventId())
            .aggregateType("Order")
            .aggregateId(savedOrder.getId().toString())
            .eventType(event.eventType())
            .payload(serialize(event))
            .status(OutboxStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        outboxRepository.save(outboxEvent);

        return savedOrder;
    }

    private String serialize(OrderEvent event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Failed to serialize event", e);
        }
    }
}

O relayer do outbox publica os eventos no Kafka de forma assíncrona:

OutboxRelayScheduler.javajava
@Component
@Slf4j
public class OutboxRelayScheduler {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OutboxRelayScheduler(OutboxRepository outboxRepository,
                               KafkaTemplate<String, String> kafkaTemplate) {
        this.outboxRepository = outboxRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void relayPendingEvents() {
        // Recuperar eventos pendentes com lock
        List<OutboxEvent> pendingEvents = outboxRepository
            .findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);

        for (OutboxEvent event : pendingEvents) {
            try {
                // Publicar no Kafka
                kafkaTemplate.send(
                    ordersTopic,
                    event.getAggregateId(),
                    event.getPayload()
                ).get(5, TimeUnit.SECONDS);

                // Atualizar status
                event.setStatus(OutboxStatus.PUBLISHED);
                event.setPublishedAt(Instant.now());
                outboxRepository.save(event);

                log.debug("Outbox event published: {}", event.getEventId());

            } catch (Exception ex) {
                log.error("Failed to publish outbox event: {}",
                    event.getEventId(), ex);
                event.setStatus(OutboxStatus.FAILED);
                outboxRepository.save(event);
            }
        }
    }
}

Esse padrão garante que um evento só seja publicado se a transação de negócio for bem-sucedida, eliminando inconsistências entre o banco de dados e o Kafka.

Comece a praticar!

Teste seus conhecimentos com nossos simuladores de entrevista e testes tecnicos.

Monitoramento e observabilidade

Supervisionar um sistema Kafka exige métricas sobre latência de consumo, lag e erros. O Spring Boot Actuator expõe essas métricas via Micrometer.

KafkaMetricsConfig.javajava
@Configuration
public class KafkaMetricsConfig {

    @Bean
    public MeterBinder kafkaConsumerMetrics(
            ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        return registry -> {
            // Métrica personalizada: eventos processados
            Counter.builder("kafka.consumer.events.processed")
                .description("Number of events successfully processed")
                .tag("topic", "orders")
                .register(registry);

            // Métrica personalizada: eventos com falha
            Counter.builder("kafka.consumer.events.failed")
                .description("Number of events that failed processing")
                .tag("topic", "orders")
                .register(registry);
        };
    }
}
InstrumentedOrderConsumer.javajava
@Service
@Slf4j
public class InstrumentedOrderConsumer {

    private final OrderProcessingService processingService;
    private final MeterRegistry meterRegistry;
    private final Counter processedCounter;
    private final Counter failedCounter;
    private final Timer processingTimer;

    public InstrumentedOrderConsumer(OrderProcessingService processingService,
                                     MeterRegistry meterRegistry) {
        this.processingService = processingService;
        this.meterRegistry = meterRegistry;

        // Inicializar contadores
        this.processedCounter = Counter.builder("kafka.consumer.events.processed")
            .tag("topic", "orders")
            .register(meterRegistry);

        this.failedCounter = Counter.builder("kafka.consumer.events.failed")
            .tag("topic", "orders")
            .register(meterRegistry);

        // Timer para medir a latência de processamento
        this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
            .tag("topic", "orders")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(meterRegistry);
    }

    @KafkaListener(topics = "${app.kafka.topics.orders}")
    public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {

        Timer.Sample sample = Timer.start(meterRegistry);

        try {
            processingService.process(event);
            acknowledgment.acknowledge();

            processedCounter.increment();
            sample.stop(processingTimer);

        } catch (Exception ex) {
            failedCounter.increment();
            sample.stop(processingTimer);
            throw ex;
        }
    }
}

Essas métricas permitem configurar alertas sobre o lag do consumidor, a taxa de erros e a latência de processamento, essenciais para a detecção proativa de problemas.

Conclusão

O Spring Kafka oferece uma integração robusta para construir arquiteturas orientadas a eventos resilientes. Dominar os mecanismos de retry, as dead letter queues e a idempotência forma a base de aplicações prontas para produção.

Checklist de arquitetura orientada a eventos com Spring Kafka:

  • ✅ Configurar commits manuais de offset com AckMode.MANUAL
  • ✅ Usar uma chave de partição consistente para preservar a ordem dos eventos
  • ✅ Implementar tentativas com backoff exponencial via DefaultErrorHandler
  • ✅ Rotear mensagens com falha para uma Dead Letter Queue
  • ✅ Garantir idempotência no consumidor com rastreamento de eventId
  • ✅ Considerar o padrão Outbox para a consistência banco/Kafka
  • ✅ Expor métricas via Micrometer para monitoramento
  • ✅ Ajustar MAX_POLL_INTERVAL_MS conforme o tempo máximo de processamento

Tags

#spring kafka
#event-driven
#kafka consumer
#microservices
#resilience

Compartilhar

Artigos relacionados