Spring Kafka: architettura event-driven con consumer resilienti

Guida completa a Spring Kafka per architetture event-driven. Configurazione, consumer resilienti, politiche di retry, dead letter queue e pattern di produzione per applicazioni distribuite.

Architettura event-driven con Spring Kafka e consumer resilienti

Apache Kafka si è affermato come standard de facto per le architetture event-driven su larga scala. Spring Kafka semplifica la sua integrazione nelle applicazioni Spring Boot offrendo al contempo i meccanismi di resilienza essenziali per gli ambienti di produzione. Questa guida approfondisce configurazione, pattern di consumo e strategie di gestione degli errori.

Prerequisiti

Questa guida presuppone familiarità con i concetti base di Kafka: topic, partizioni, consumer group e offset. Il focus è sull'integrazione Spring e sui pattern di resilienza.

Perché scegliere un'architettura event-driven?

Le architetture event-driven disaccoppiano i componenti di un sistema tramite eventi asincroni. A differenza delle chiamate REST sincrone, i producer emettono eventi senza attendere risposta, permettendo ai consumer di elaborare al proprio ritmo.

Questo approccio offre diversi vantaggi critici: scalabilità orizzontale indipendente per servizio, maggiore resilienza ai guasti temporanei e tracciabilità completa tramite il log immutabile di Kafka.

OrderEvent.javajava
public record OrderEvent(
    // Identificatore univoco dell'evento per idempotenza
    String eventId,
    // Tipo di evento per il routing
    String eventType,
    // Timestamp di creazione
    Instant createdAt,
    // Payload di business
    OrderPayload payload
) {
    // Factory method che garantisce l'unicità di eventId
    public static OrderEvent created(OrderPayload payload) {
        return new OrderEvent(
            UUID.randomUUID().toString(),
            "ORDER_CREATED",
            Instant.now(),
            payload
        );
    }
}

public record OrderPayload(
    Long orderId,
    Long customerId,
    List<OrderItem> items,
    BigDecimal totalAmount
) {}

La struttura dell'evento include sistematicamente un identificatore univoco, un tipo e un timestamp. Questi metadati permettono filtraggio lato consumer, rilevamento dei duplicati e tracciamento temporale.

Configurazione base di Spring Kafka

L'integrazione parte dallo starter spring-kafka e da una configurazione YAML minima. Spring Boot configura automaticamente i bean essenziali: KafkaTemplate per la produzione e ConcurrentKafkaListenerContainerFactory per il consumo.

yaml
# application.yml
spring:
  kafka:
    # Indirizzi dei broker Kafka (cluster)
    bootstrap-servers: localhost:9092

    # Configurazione del producer
    producer:
      # Serializzazione delle chiavi String
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Serializzazione dei valori JSON
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # Attesa della conferma di tutte le repliche
      acks: all
      # Numero di tentativi in caso di errore di rete
      retries: 3

    # Configurazione del consumer
    consumer:
      # Identificatore del consumer group
      group-id: order-service
      # Deserializzazione delle chiavi
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Deserializzazione JSON con tipo di destinazione
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # Posizione iniziale se non c'è offset registrato
      auto-offset-reset: earliest
      # Disabilitare auto-commit per controllo manuale
      enable-auto-commit: false
      properties:
        # Pacchetto fidato per la deserializzazione
        spring.json.trusted.packages: com.example.events

Disabilitare enable-auto-commit è una pratica essenziale in produzione. I commit manuali degli offset garantiscono che un messaggio venga marcato come elaborato solo dopo l'effettivo completamento dell'elaborazione.

Creare un producer Kafka

Il KafkaTemplate incapsula la logica di invio a Kafka. La sua iniezione diretta permette l'uso immediato all'interno dei servizi di business.

OrderEventPublisher.javajava
@Service
@Slf4j
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    // Topic di destinazione (esternalizzato in configurazione)
    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
            OrderPayload payload) {

        // Creazione dell'evento con i metadati
        OrderEvent event = OrderEvent.created(payload);

        // Uso di orderId come chiave di partizione
        // Garantisce l'ordine degli eventi per lo stesso ordine
        String partitionKey = payload.orderId().toString();

        log.info("Publishing ORDER_CREATED event: {} to topic: {}",
            event.eventId(), ordersTopic);

        // Invio asincrono con callback
        return kafkaTemplate.send(ordersTopic, partitionKey, event)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    // Successo: log dei metadati di invio
                    RecordMetadata metadata = result.getRecordMetadata();
                    log.info("Event sent successfully: topic={}, partition={}, offset={}",
                        metadata.topic(), metadata.partition(), metadata.offset());
                } else {
                    // Errore: log per investigazione
                    log.error("Failed to send event: {}", event.eventId(), ex);
                }
            });
    }
}

L'uso di una chiave di partizione basata sull'identificatore di business garantisce che tutti gli eventi relativi alla stessa entità finiscano nella stessa partizione, preservandone così l'ordine cronologico.

Chiave di partizione

Una chiave nulla distribuisce i messaggi in round-robin tra le partizioni. Massimizza il parallelismo ma perde le garanzie di ordine. La scelta della chiave dipende dai requisiti di business.

Consumer base con @KafkaListener

L'annotazione @KafkaListener trasforma un metodo in un consumer Kafka. Spring gestisce automaticamente il ciclo di polling, la deserializzazione e il commit degli offset.

OrderEventConsumer.javajava
@Service
@Slf4j
public class OrderEventConsumer {

    private final OrderProcessingService processingService;

    public OrderEventConsumer(OrderProcessingService processingService) {
        this.processingService = processingService;
    }

    @KafkaListener(
        // Topic da ascoltare
        topics = "${app.kafka.topics.orders}",
        // Consumer group
        groupId = "${spring.kafka.consumer.group-id}",
        // Factory personalizzata per configurazione avanzata
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            // Payload deserializzato automaticamente
            OrderEvent event,
            // Metadati Kafka iniettati
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            // Acknowledgment per commit manuale
            Acknowledgment acknowledgment) {

        log.info("Received event: type={}, partition={}, offset={}",
            event.eventType(), partition, offset);

        try {
            // Elaborazione di business
            processingService.process(event);

            // Commit dell'offset solo dopo elaborazione riuscita
            acknowledgment.acknowledge();

            log.info("Event processed successfully: {}", event.eventId());

        } catch (Exception ex) {
            // L'assenza di acknowledge() causa il riprocessing
            log.error("Failed to process event: {}", event.eventId(), ex);
            throw ex;
        }
    }
}

Iniettare Acknowledgment permette il controllo esplicito del commit. Senza la chiamata a acknowledge(), l'offset resta non confermato e il messaggio verrà riconsegnato al prossimo poll.

Configurazione avanzata di ConsumerFactory

La configurazione di default è adatta allo sviluppo ma richiede aggiustamenti per la produzione. Una factory personalizzata offre un controllo fine sul comportamento del consumer.

KafkaConsumerConfig.javajava
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        // Configurazione di connessione
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // Deserializzazione
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);

        // Gestione degli offset
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // Performance: record per poll
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        // Timeout di sessione per rilevamento dei guasti
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        // Intervallo di heartbeat (1/3 del session timeout)
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

        // Tempo massimo di elaborazione prima del rebalance
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

        // Configurazione del deserializer JSON
        JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
        deserializer.addTrustedPackages("com.example.events");
        deserializer.setUseTypeMapperForKey(false);

        return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        // Modalità di acknowledgment manuale
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Numero di thread consumer
        factory.setConcurrency(3);

        // Elaborazione batch disabilitata (un messaggio alla volta)
        factory.setBatchListener(false);

        return factory;
    }
}

Il parametro MAX_POLL_INTERVAL_MS_CONFIG definisce il ritardo massimo tra le chiamate poll(). Superarlo provoca l'espulsione del consumer dal gruppo e un rebalance. Questo valore deve riflettere il tempo massimo di elaborazione previsto.

Pronto a superare i tuoi colloqui su Spring Boot?

Pratica con i nostri simulatori interattivi, flashcards e test tecnici.

Strategie di retry con RetryTemplate

Gli errori transitori (indisponibilità temporanea di un servizio, timeout di rete) richiedono retry automatici. Spring Kafka si integra con RetryTemplate per implementare politiche di retry sofisticate.

KafkaRetryConfig.javajava
@Configuration
public class KafkaRetryConfig {

    @Bean
    public RetryTemplate kafkaRetryTemplate() {
        RetryTemplate template = new RetryTemplate();

        // Politica di retry: 3 tentativi massimi
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        template.setRetryPolicy(retryPolicy);

        // Backoff esponenziale: 1s, 2s, 4s tra i tentativi
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        template.setBackOffPolicy(backOffPolicy);

        return template;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            retryableKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                RetryTemplate kafkaRetryTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Configurazione retry con callback di recovery
        factory.setCommonErrorHandler(
            new DefaultErrorHandler(
                // Recovery: azione dopo l'esaurimento dei tentativi
                (record, exception) -> {
                    log.error("All retries exhausted for record: key={}, value={}",
                        record.key(), record.value(), exception);
                },
                // Backoff esponenziale: 1s iniziale, max 30s, 3 tentativi
                new ExponentialBackOff(1000L, 2.0)
            )
        );

        return factory;
    }
}

Il DefaultErrorHandler sostituisce il vecchio SeekToCurrentErrorHandler da Spring Kafka 2.8. Offre un'API più chiara e opzioni di configurazione estese.

Implementare una Dead Letter Queue

Una volta esauriti i tentativi, i messaggi falliti devono essere instradati verso una Dead Letter Queue (DLQ) per analisi successiva. Questo approccio evita la perdita di dati sbloccando al contempo il consumer.

DeadLetterConfig.javajava
@Configuration
public class DeadLetterConfig {

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // Strategia di naming del topic DLT: original-topic.DLT
        return new DeadLetterPublishingRecoverer(
            kafkaTemplate,
            (record, exception) -> {
                // Topic DLT basato sul topic di origine
                String dltTopic = record.topic() + ".DLT";

                log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
                    dltTopic, record.key(), exception.getMessage());

                return new TopicPartition(dltTopic, record.partition());
            }
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            dltKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // ErrorHandler con recovery DLT
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            deadLetterPublishingRecoverer,
            new FixedBackOff(1000L, 3)
        );

        // Eccezioni non riprovabili (invio diretto al DLT)
        errorHandler.addNotRetryableExceptions(
            ValidationException.class,
            JsonParseException.class,
            NullPointerException.class
        );

        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }
}

Distinguere tra eccezioni riprovabili e non riprovabili ottimizza il comportamento. Una ValidationException indica dati malformati che non saranno corretti dai retry, giustificando l'invio diretto al DLT.

Consumer DLT per il riprocessing manuale

Un consumer dedicato monitora il DLT e permette il riprocessing dei messaggi dopo aver corretto il problema sottostante.

DeadLetterConsumer.javajava
@Service
@Slf4j
public class DeadLetterConsumer {

    private final AlertingService alertingService;
    private final FailedEventRepository failedEventRepository;

    public DeadLetterConsumer(AlertingService alertingService,
                              FailedEventRepository failedEventRepository) {
        this.alertingService = alertingService;
        this.failedEventRepository = failedEventRepository;
    }

    @KafkaListener(
        topics = "${app.kafka.topics.orders}.DLT",
        groupId = "order-service-dlt"
    )
    public void handleDeadLetter(
            ConsumerRecord<String, OrderEvent> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
            @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
            @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
            Acknowledgment acknowledgment) {

        log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
            originalTopic, originalPartition, originalOffset, exceptionMessage);

        // Persistenza per analisi e riprocessing successivo
        FailedEvent failedEvent = FailedEvent.builder()
            .eventId(record.value().eventId())
            .eventType(record.value().eventType())
            .payload(serializePayload(record.value()))
            .originalTopic(originalTopic)
            .originalPartition(originalPartition)
            .originalOffset(originalOffset)
            .exceptionMessage(exceptionMessage)
            .stacktrace(stacktrace)
            .status(FailedEventStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        failedEventRepository.save(failedEvent);

        // Alert per intervento umano
        alertingService.notifyDeadLetter(failedEvent);

        acknowledgment.acknowledge();
    }
}
Header DLT

Spring Kafka arricchisce automaticamente i messaggi DLT con header che contengono i metadati del fallimento: eccezione, stacktrace, topic originale, partizione e offset. Queste informazioni facilitano la diagnosi.

Gestione dell'idempotenza lato consumer

Kafka garantisce una consegna "at least once": un messaggio può essere consegnato più volte se si verifica un crash dopo l'elaborazione ma prima del commit. L'idempotenza lato consumer evita gli effetti collaterali del riprocessing.

IdempotentOrderProcessor.javajava
@Service
@Slf4j
public class IdempotentOrderProcessor {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderService orderService;

    public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
                                    OrderService orderService) {
        this.processedEventRepository = processedEventRepository;
        this.orderService = orderService;
    }

    @Transactional
    public void processIdempotent(OrderEvent event) {
        String eventId = event.eventId();

        // Verifica: evento già elaborato?
        if (processedEventRepository.existsByEventId(eventId)) {
            log.info("Event already processed, skipping: {}", eventId);
            return;
        }

        // Elaborazione di business
        switch (event.eventType()) {
            case "ORDER_CREATED" -> orderService.createOrder(event.payload());
            case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
            case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
            default -> log.warn("Unknown event type: {}", event.eventType());
        }

        // Registrare l'elaborazione nella stessa transazione
        ProcessedEvent processed = ProcessedEvent.builder()
            .eventId(eventId)
            .eventType(event.eventType())
            .processedAt(Instant.now())
            .build();

        processedEventRepository.save(processed);

        log.info("Event processed and recorded: {}", eventId);
    }
}
ProcessedEventRepository.javajava
@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {

    // Indice su eventId per performance ottimali
    boolean existsByEventId(String eventId);

    // Pulizia dei record obsoleti
    @Modifying
    @Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
    int deleteOlderThan(@Param("before") Instant before);
}

Registrare l'elaborazione nella stessa transazione della logica di business garantisce atomicità. Un crash dopo l'elaborazione ma prima del commit ripeterà entrambe le operazioni in modo coerente.

Pattern Outbox transazionale

Il pattern Outbox risolve i problemi di consistenza tra database e Kafka. Invece di pubblicare direttamente su Kafka, gli eventi vengono prima persistiti in una tabella "outbox" e poi rilanciati da un processo dedicato.

OutboxEvent.javajava
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String eventId;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String eventType;

    @Column(columnDefinition = "TEXT", nullable = false)
    private String payload;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private OutboxStatus status;

    @Column(nullable = false)
    private Instant createdAt;

    private Instant publishedAt;
}

public enum OutboxStatus {
    PENDING, PUBLISHED, FAILED
}
OrderService.javajava
@Service
@Transactional
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    public OrderService(OrderRepository orderRepository,
                       OutboxRepository outboxRepository,
                       ObjectMapper objectMapper) {
        this.orderRepository = orderRepository;
        this.outboxRepository = outboxRepository;
        this.objectMapper = objectMapper;
    }

    public Order createOrder(CreateOrderRequest request) {
        // Creare l'ordine
        Order order = Order.builder()
            .customerId(request.customerId())
            .items(request.items())
            .totalAmount(calculateTotal(request.items()))
            .status(OrderStatus.CREATED)
            .createdAt(Instant.now())
            .build();

        Order savedOrder = orderRepository.save(order);

        // Creare l'evento outbox nella stessa transazione
        OrderEvent event = OrderEvent.created(toPayload(savedOrder));

        OutboxEvent outboxEvent = OutboxEvent.builder()
            .eventId(event.eventId())
            .aggregateType("Order")
            .aggregateId(savedOrder.getId().toString())
            .eventType(event.eventType())
            .payload(serialize(event))
            .status(OutboxStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        outboxRepository.save(outboxEvent);

        return savedOrder;
    }

    private String serialize(OrderEvent event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Failed to serialize event", e);
        }
    }
}

Il relayer outbox pubblica gli eventi su Kafka in modo asincrono:

OutboxRelayScheduler.javajava
@Component
@Slf4j
public class OutboxRelayScheduler {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OutboxRelayScheduler(OutboxRepository outboxRepository,
                               KafkaTemplate<String, String> kafkaTemplate) {
        this.outboxRepository = outboxRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void relayPendingEvents() {
        // Recuperare gli eventi pendenti con lock
        List<OutboxEvent> pendingEvents = outboxRepository
            .findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);

        for (OutboxEvent event : pendingEvents) {
            try {
                // Pubblicare su Kafka
                kafkaTemplate.send(
                    ordersTopic,
                    event.getAggregateId(),
                    event.getPayload()
                ).get(5, TimeUnit.SECONDS);

                // Aggiornare lo stato
                event.setStatus(OutboxStatus.PUBLISHED);
                event.setPublishedAt(Instant.now());
                outboxRepository.save(event);

                log.debug("Outbox event published: {}", event.getEventId());

            } catch (Exception ex) {
                log.error("Failed to publish outbox event: {}",
                    event.getEventId(), ex);
                event.setStatus(OutboxStatus.FAILED);
                outboxRepository.save(event);
            }
        }
    }
}

Questo pattern assicura che un evento venga pubblicato solo se la transazione di business è riuscita, eliminando le incongruenze tra database e Kafka.

Inizia a praticare!

Metti alla prova le tue conoscenze con i nostri simulatori di colloquio e test tecnici.

Monitoraggio e osservabilità

Supervisionare un sistema Kafka richiede metriche su latenza di consumo, lag ed errori. Spring Boot Actuator espone queste metriche tramite Micrometer.

KafkaMetricsConfig.javajava
@Configuration
public class KafkaMetricsConfig {

    @Bean
    public MeterBinder kafkaConsumerMetrics(
            ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        return registry -> {
            // Metrica personalizzata: eventi elaborati
            Counter.builder("kafka.consumer.events.processed")
                .description("Number of events successfully processed")
                .tag("topic", "orders")
                .register(registry);

            // Metrica personalizzata: eventi falliti
            Counter.builder("kafka.consumer.events.failed")
                .description("Number of events that failed processing")
                .tag("topic", "orders")
                .register(registry);
        };
    }
}
InstrumentedOrderConsumer.javajava
@Service
@Slf4j
public class InstrumentedOrderConsumer {

    private final OrderProcessingService processingService;
    private final MeterRegistry meterRegistry;
    private final Counter processedCounter;
    private final Counter failedCounter;
    private final Timer processingTimer;

    public InstrumentedOrderConsumer(OrderProcessingService processingService,
                                     MeterRegistry meterRegistry) {
        this.processingService = processingService;
        this.meterRegistry = meterRegistry;

        // Inizializzare i contatori
        this.processedCounter = Counter.builder("kafka.consumer.events.processed")
            .tag("topic", "orders")
            .register(meterRegistry);

        this.failedCounter = Counter.builder("kafka.consumer.events.failed")
            .tag("topic", "orders")
            .register(meterRegistry);

        // Timer per misurare la latenza di elaborazione
        this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
            .tag("topic", "orders")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(meterRegistry);
    }

    @KafkaListener(topics = "${app.kafka.topics.orders}")
    public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {

        Timer.Sample sample = Timer.start(meterRegistry);

        try {
            processingService.process(event);
            acknowledgment.acknowledge();

            processedCounter.increment();
            sample.stop(processingTimer);

        } catch (Exception ex) {
            failedCounter.increment();
            sample.stop(processingTimer);
            throw ex;
        }
    }
}

Queste metriche permettono di configurare alert sul lag del consumer, sul tasso di errore e sulla latenza di elaborazione, essenziali per il rilevamento proattivo dei problemi.

Conclusione

Spring Kafka fornisce un'integrazione robusta per costruire architetture event-driven resilienti. Padroneggiare i meccanismi di retry, le dead letter queue e l'idempotenza costituisce la base delle applicazioni production-ready.

Checklist di un'architettura event-driven con Spring Kafka:

  • ✅ Configurare i commit manuali degli offset con AckMode.MANUAL
  • ✅ Usare una chiave di partizione coerente per preservare l'ordine degli eventi
  • ✅ Implementare i retry con backoff esponenziale tramite DefaultErrorHandler
  • ✅ Instradare i messaggi falliti verso una Dead Letter Queue
  • ✅ Garantire l'idempotenza lato consumer con il tracciamento dell'eventId
  • ✅ Considerare il pattern Outbox per la consistenza database/Kafka
  • ✅ Esporre metriche tramite Micrometer per il monitoraggio
  • ✅ Adattare MAX_POLL_INTERVAL_MS al tempo massimo di elaborazione

Tag

#spring kafka
#event-driven
#kafka consumer
#microservices
#resilience

Condividi

Articoli correlati