Spring Kafka: architettura event-driven con consumer resilienti
Guida completa a Spring Kafka per architetture event-driven. Configurazione, consumer resilienti, politiche di retry, dead letter queue e pattern di produzione per applicazioni distribuite.

Apache Kafka si è affermato come standard de facto per le architetture event-driven su larga scala. Spring Kafka semplifica la sua integrazione nelle applicazioni Spring Boot offrendo al contempo i meccanismi di resilienza essenziali per gli ambienti di produzione. Questa guida approfondisce configurazione, pattern di consumo e strategie di gestione degli errori.
Questa guida presuppone familiarità con i concetti base di Kafka: topic, partizioni, consumer group e offset. Il focus è sull'integrazione Spring e sui pattern di resilienza.
Perché scegliere un'architettura event-driven?
Le architetture event-driven disaccoppiano i componenti di un sistema tramite eventi asincroni. A differenza delle chiamate REST sincrone, i producer emettono eventi senza attendere risposta, permettendo ai consumer di elaborare al proprio ritmo.
Questo approccio offre diversi vantaggi critici: scalabilità orizzontale indipendente per servizio, maggiore resilienza ai guasti temporanei e tracciabilità completa tramite il log immutabile di Kafka.
public record OrderEvent(
// Identificatore univoco dell'evento per idempotenza
String eventId,
// Tipo di evento per il routing
String eventType,
// Timestamp di creazione
Instant createdAt,
// Payload di business
OrderPayload payload
) {
// Factory method che garantisce l'unicità di eventId
public static OrderEvent created(OrderPayload payload) {
return new OrderEvent(
UUID.randomUUID().toString(),
"ORDER_CREATED",
Instant.now(),
payload
);
}
}
public record OrderPayload(
Long orderId,
Long customerId,
List<OrderItem> items,
BigDecimal totalAmount
) {}La struttura dell'evento include sistematicamente un identificatore univoco, un tipo e un timestamp. Questi metadati permettono filtraggio lato consumer, rilevamento dei duplicati e tracciamento temporale.
Configurazione base di Spring Kafka
L'integrazione parte dallo starter spring-kafka e da una configurazione YAML minima. Spring Boot configura automaticamente i bean essenziali: KafkaTemplate per la produzione e ConcurrentKafkaListenerContainerFactory per il consumo.
# application.yml
spring:
kafka:
# Indirizzi dei broker Kafka (cluster)
bootstrap-servers: localhost:9092
# Configurazione del producer
producer:
# Serializzazione delle chiavi String
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Serializzazione dei valori JSON
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# Attesa della conferma di tutte le repliche
acks: all
# Numero di tentativi in caso di errore di rete
retries: 3
# Configurazione del consumer
consumer:
# Identificatore del consumer group
group-id: order-service
# Deserializzazione delle chiavi
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Deserializzazione JSON con tipo di destinazione
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# Posizione iniziale se non c'è offset registrato
auto-offset-reset: earliest
# Disabilitare auto-commit per controllo manuale
enable-auto-commit: false
properties:
# Pacchetto fidato per la deserializzazione
spring.json.trusted.packages: com.example.eventsDisabilitare enable-auto-commit è una pratica essenziale in produzione. I commit manuali degli offset garantiscono che un messaggio venga marcato come elaborato solo dopo l'effettivo completamento dell'elaborazione.
Creare un producer Kafka
Il KafkaTemplate incapsula la logica di invio a Kafka. La sua iniezione diretta permette l'uso immediato all'interno dei servizi di business.
@Service
@Slf4j
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
// Topic di destinazione (esternalizzato in configurazione)
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
OrderPayload payload) {
// Creazione dell'evento con i metadati
OrderEvent event = OrderEvent.created(payload);
// Uso di orderId come chiave di partizione
// Garantisce l'ordine degli eventi per lo stesso ordine
String partitionKey = payload.orderId().toString();
log.info("Publishing ORDER_CREATED event: {} to topic: {}",
event.eventId(), ordersTopic);
// Invio asincrono con callback
return kafkaTemplate.send(ordersTopic, partitionKey, event)
.whenComplete((result, ex) -> {
if (ex == null) {
// Successo: log dei metadati di invio
RecordMetadata metadata = result.getRecordMetadata();
log.info("Event sent successfully: topic={}, partition={}, offset={}",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
// Errore: log per investigazione
log.error("Failed to send event: {}", event.eventId(), ex);
}
});
}
}L'uso di una chiave di partizione basata sull'identificatore di business garantisce che tutti gli eventi relativi alla stessa entità finiscano nella stessa partizione, preservandone così l'ordine cronologico.
Una chiave nulla distribuisce i messaggi in round-robin tra le partizioni. Massimizza il parallelismo ma perde le garanzie di ordine. La scelta della chiave dipende dai requisiti di business.
Consumer base con @KafkaListener
L'annotazione @KafkaListener trasforma un metodo in un consumer Kafka. Spring gestisce automaticamente il ciclo di polling, la deserializzazione e il commit degli offset.
@Service
@Slf4j
public class OrderEventConsumer {
private final OrderProcessingService processingService;
public OrderEventConsumer(OrderProcessingService processingService) {
this.processingService = processingService;
}
@KafkaListener(
// Topic da ascoltare
topics = "${app.kafka.topics.orders}",
// Consumer group
groupId = "${spring.kafka.consumer.group-id}",
// Factory personalizzata per configurazione avanzata
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
// Payload deserializzato automaticamente
OrderEvent event,
// Metadati Kafka iniettati
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
// Acknowledgment per commit manuale
Acknowledgment acknowledgment) {
log.info("Received event: type={}, partition={}, offset={}",
event.eventType(), partition, offset);
try {
// Elaborazione di business
processingService.process(event);
// Commit dell'offset solo dopo elaborazione riuscita
acknowledgment.acknowledge();
log.info("Event processed successfully: {}", event.eventId());
} catch (Exception ex) {
// L'assenza di acknowledge() causa il riprocessing
log.error("Failed to process event: {}", event.eventId(), ex);
throw ex;
}
}
}Iniettare Acknowledgment permette il controllo esplicito del commit. Senza la chiamata a acknowledge(), l'offset resta non confermato e il messaggio verrà riconsegnato al prossimo poll.
Configurazione avanzata di ConsumerFactory
La configurazione di default è adatta allo sviluppo ma richiede aggiustamenti per la produzione. Una factory personalizzata offre un controllo fine sul comportamento del consumer.
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, OrderEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
// Configurazione di connessione
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// Deserializzazione
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
// Gestione degli offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Performance: record per poll
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// Timeout di sessione per rilevamento dei guasti
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// Intervallo di heartbeat (1/3 del session timeout)
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
// Tempo massimo di elaborazione prima del rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// Configurazione del deserializer JSON
JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
deserializer.addTrustedPackages("com.example.events");
deserializer.setUseTypeMapperForKey(false);
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
deserializer
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Modalità di acknowledgment manuale
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// Numero di thread consumer
factory.setConcurrency(3);
// Elaborazione batch disabilitata (un messaggio alla volta)
factory.setBatchListener(false);
return factory;
}
}Il parametro MAX_POLL_INTERVAL_MS_CONFIG definisce il ritardo massimo tra le chiamate poll(). Superarlo provoca l'espulsione del consumer dal gruppo e un rebalance. Questo valore deve riflettere il tempo massimo di elaborazione previsto.
Pronto a superare i tuoi colloqui su Spring Boot?
Pratica con i nostri simulatori interattivi, flashcards e test tecnici.
Strategie di retry con RetryTemplate
Gli errori transitori (indisponibilità temporanea di un servizio, timeout di rete) richiedono retry automatici. Spring Kafka si integra con RetryTemplate per implementare politiche di retry sofisticate.
@Configuration
public class KafkaRetryConfig {
@Bean
public RetryTemplate kafkaRetryTemplate() {
RetryTemplate template = new RetryTemplate();
// Politica di retry: 3 tentativi massimi
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
template.setRetryPolicy(retryPolicy);
// Backoff esponenziale: 1s, 2s, 4s tra i tentativi
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
template.setBackOffPolicy(backOffPolicy);
return template;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
retryableKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
RetryTemplate kafkaRetryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// Configurazione retry con callback di recovery
factory.setCommonErrorHandler(
new DefaultErrorHandler(
// Recovery: azione dopo l'esaurimento dei tentativi
(record, exception) -> {
log.error("All retries exhausted for record: key={}, value={}",
record.key(), record.value(), exception);
},
// Backoff esponenziale: 1s iniziale, max 30s, 3 tentativi
new ExponentialBackOff(1000L, 2.0)
)
);
return factory;
}
}Il DefaultErrorHandler sostituisce il vecchio SeekToCurrentErrorHandler da Spring Kafka 2.8. Offre un'API più chiara e opzioni di configurazione estese.
Implementare una Dead Letter Queue
Una volta esauriti i tentativi, i messaggi falliti devono essere instradati verso una Dead Letter Queue (DLQ) per analisi successiva. Questo approccio evita la perdita di dati sbloccando al contempo il consumer.
@Configuration
public class DeadLetterConfig {
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
KafkaTemplate<String, Object> kafkaTemplate) {
// Strategia di naming del topic DLT: original-topic.DLT
return new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, exception) -> {
// Topic DLT basato sul topic di origine
String dltTopic = record.topic() + ".DLT";
log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
dltTopic, record.key(), exception.getMessage());
return new TopicPartition(dltTopic, record.partition());
}
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
dltKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// ErrorHandler con recovery DLT
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
deadLetterPublishingRecoverer,
new FixedBackOff(1000L, 3)
);
// Eccezioni non riprovabili (invio diretto al DLT)
errorHandler.addNotRetryableExceptions(
ValidationException.class,
JsonParseException.class,
NullPointerException.class
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}Distinguere tra eccezioni riprovabili e non riprovabili ottimizza il comportamento. Una ValidationException indica dati malformati che non saranno corretti dai retry, giustificando l'invio diretto al DLT.
Consumer DLT per il riprocessing manuale
Un consumer dedicato monitora il DLT e permette il riprocessing dei messaggi dopo aver corretto il problema sottostante.
@Service
@Slf4j
public class DeadLetterConsumer {
private final AlertingService alertingService;
private final FailedEventRepository failedEventRepository;
public DeadLetterConsumer(AlertingService alertingService,
FailedEventRepository failedEventRepository) {
this.alertingService = alertingService;
this.failedEventRepository = failedEventRepository;
}
@KafkaListener(
topics = "${app.kafka.topics.orders}.DLT",
groupId = "order-service-dlt"
)
public void handleDeadLetter(
ConsumerRecord<String, OrderEvent> record,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
@Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
@Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
Acknowledgment acknowledgment) {
log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
originalTopic, originalPartition, originalOffset, exceptionMessage);
// Persistenza per analisi e riprocessing successivo
FailedEvent failedEvent = FailedEvent.builder()
.eventId(record.value().eventId())
.eventType(record.value().eventType())
.payload(serializePayload(record.value()))
.originalTopic(originalTopic)
.originalPartition(originalPartition)
.originalOffset(originalOffset)
.exceptionMessage(exceptionMessage)
.stacktrace(stacktrace)
.status(FailedEventStatus.PENDING)
.createdAt(Instant.now())
.build();
failedEventRepository.save(failedEvent);
// Alert per intervento umano
alertingService.notifyDeadLetter(failedEvent);
acknowledgment.acknowledge();
}
}Spring Kafka arricchisce automaticamente i messaggi DLT con header che contengono i metadati del fallimento: eccezione, stacktrace, topic originale, partizione e offset. Queste informazioni facilitano la diagnosi.
Gestione dell'idempotenza lato consumer
Kafka garantisce una consegna "at least once": un messaggio può essere consegnato più volte se si verifica un crash dopo l'elaborazione ma prima del commit. L'idempotenza lato consumer evita gli effetti collaterali del riprocessing.
@Service
@Slf4j
public class IdempotentOrderProcessor {
private final ProcessedEventRepository processedEventRepository;
private final OrderService orderService;
public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
OrderService orderService) {
this.processedEventRepository = processedEventRepository;
this.orderService = orderService;
}
@Transactional
public void processIdempotent(OrderEvent event) {
String eventId = event.eventId();
// Verifica: evento già elaborato?
if (processedEventRepository.existsByEventId(eventId)) {
log.info("Event already processed, skipping: {}", eventId);
return;
}
// Elaborazione di business
switch (event.eventType()) {
case "ORDER_CREATED" -> orderService.createOrder(event.payload());
case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
default -> log.warn("Unknown event type: {}", event.eventType());
}
// Registrare l'elaborazione nella stessa transazione
ProcessedEvent processed = ProcessedEvent.builder()
.eventId(eventId)
.eventType(event.eventType())
.processedAt(Instant.now())
.build();
processedEventRepository.save(processed);
log.info("Event processed and recorded: {}", eventId);
}
}@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {
// Indice su eventId per performance ottimali
boolean existsByEventId(String eventId);
// Pulizia dei record obsoleti
@Modifying
@Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
int deleteOlderThan(@Param("before") Instant before);
}Registrare l'elaborazione nella stessa transazione della logica di business garantisce atomicità. Un crash dopo l'elaborazione ma prima del commit ripeterà entrambe le operazioni in modo coerente.
Pattern Outbox transazionale
Il pattern Outbox risolve i problemi di consistenza tra database e Kafka. Invece di pubblicare direttamente su Kafka, gli eventi vengono prima persistiti in una tabella "outbox" e poi rilanciati da un processo dedicato.
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String eventId;
@Column(nullable = false)
private String aggregateType;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String eventType;
@Column(columnDefinition = "TEXT", nullable = false)
private String payload;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private OutboxStatus status;
@Column(nullable = false)
private Instant createdAt;
private Instant publishedAt;
}
public enum OutboxStatus {
PENDING, PUBLISHED, FAILED
}@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
public OrderService(OrderRepository orderRepository,
OutboxRepository outboxRepository,
ObjectMapper objectMapper) {
this.orderRepository = orderRepository;
this.outboxRepository = outboxRepository;
this.objectMapper = objectMapper;
}
public Order createOrder(CreateOrderRequest request) {
// Creare l'ordine
Order order = Order.builder()
.customerId(request.customerId())
.items(request.items())
.totalAmount(calculateTotal(request.items()))
.status(OrderStatus.CREATED)
.createdAt(Instant.now())
.build();
Order savedOrder = orderRepository.save(order);
// Creare l'evento outbox nella stessa transazione
OrderEvent event = OrderEvent.created(toPayload(savedOrder));
OutboxEvent outboxEvent = OutboxEvent.builder()
.eventId(event.eventId())
.aggregateType("Order")
.aggregateId(savedOrder.getId().toString())
.eventType(event.eventType())
.payload(serialize(event))
.status(OutboxStatus.PENDING)
.createdAt(Instant.now())
.build();
outboxRepository.save(outboxEvent);
return savedOrder;
}
private String serialize(OrderEvent event) {
try {
return objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
throw new SerializationException("Failed to serialize event", e);
}
}
}Il relayer outbox pubblica gli eventi su Kafka in modo asincrono:
@Component
@Slf4j
public class OutboxRelayScheduler {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OutboxRelayScheduler(OutboxRepository outboxRepository,
KafkaTemplate<String, String> kafkaTemplate) {
this.outboxRepository = outboxRepository;
this.kafkaTemplate = kafkaTemplate;
}
@Scheduled(fixedDelay = 1000)
@Transactional
public void relayPendingEvents() {
// Recuperare gli eventi pendenti con lock
List<OutboxEvent> pendingEvents = outboxRepository
.findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);
for (OutboxEvent event : pendingEvents) {
try {
// Pubblicare su Kafka
kafkaTemplate.send(
ordersTopic,
event.getAggregateId(),
event.getPayload()
).get(5, TimeUnit.SECONDS);
// Aggiornare lo stato
event.setStatus(OutboxStatus.PUBLISHED);
event.setPublishedAt(Instant.now());
outboxRepository.save(event);
log.debug("Outbox event published: {}", event.getEventId());
} catch (Exception ex) {
log.error("Failed to publish outbox event: {}",
event.getEventId(), ex);
event.setStatus(OutboxStatus.FAILED);
outboxRepository.save(event);
}
}
}
}Questo pattern assicura che un evento venga pubblicato solo se la transazione di business è riuscita, eliminando le incongruenze tra database e Kafka.
Inizia a praticare!
Metti alla prova le tue conoscenze con i nostri simulatori di colloquio e test tecnici.
Monitoraggio e osservabilità
Supervisionare un sistema Kafka richiede metriche su latenza di consumo, lag ed errori. Spring Boot Actuator espone queste metriche tramite Micrometer.
@Configuration
public class KafkaMetricsConfig {
@Bean
public MeterBinder kafkaConsumerMetrics(
ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
return registry -> {
// Metrica personalizzata: eventi elaborati
Counter.builder("kafka.consumer.events.processed")
.description("Number of events successfully processed")
.tag("topic", "orders")
.register(registry);
// Metrica personalizzata: eventi falliti
Counter.builder("kafka.consumer.events.failed")
.description("Number of events that failed processing")
.tag("topic", "orders")
.register(registry);
};
}
}@Service
@Slf4j
public class InstrumentedOrderConsumer {
private final OrderProcessingService processingService;
private final MeterRegistry meterRegistry;
private final Counter processedCounter;
private final Counter failedCounter;
private final Timer processingTimer;
public InstrumentedOrderConsumer(OrderProcessingService processingService,
MeterRegistry meterRegistry) {
this.processingService = processingService;
this.meterRegistry = meterRegistry;
// Inizializzare i contatori
this.processedCounter = Counter.builder("kafka.consumer.events.processed")
.tag("topic", "orders")
.register(meterRegistry);
this.failedCounter = Counter.builder("kafka.consumer.events.failed")
.tag("topic", "orders")
.register(meterRegistry);
// Timer per misurare la latenza di elaborazione
this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
.tag("topic", "orders")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
}
@KafkaListener(topics = "${app.kafka.topics.orders}")
public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
processingService.process(event);
acknowledgment.acknowledge();
processedCounter.increment();
sample.stop(processingTimer);
} catch (Exception ex) {
failedCounter.increment();
sample.stop(processingTimer);
throw ex;
}
}
}Queste metriche permettono di configurare alert sul lag del consumer, sul tasso di errore e sulla latenza di elaborazione, essenziali per il rilevamento proattivo dei problemi.
Conclusione
Spring Kafka fornisce un'integrazione robusta per costruire architetture event-driven resilienti. Padroneggiare i meccanismi di retry, le dead letter queue e l'idempotenza costituisce la base delle applicazioni production-ready.
Checklist di un'architettura event-driven con Spring Kafka:
- ✅ Configurare i commit manuali degli offset con
AckMode.MANUAL - ✅ Usare una chiave di partizione coerente per preservare l'ordine degli eventi
- ✅ Implementare i retry con backoff esponenziale tramite
DefaultErrorHandler - ✅ Instradare i messaggi falliti verso una Dead Letter Queue
- ✅ Garantire l'idempotenza lato consumer con il tracciamento dell'eventId
- ✅ Considerare il pattern Outbox per la consistenza database/Kafka
- ✅ Esporre metriche tramite Micrometer per il monitoraggio
- ✅ Adattare
MAX_POLL_INTERVAL_MSal tempo massimo di elaborazione
Tag
Condividi
Articoli correlati

Colloquio Spring Cloud Gateway: Routing, Filtri e Load Balancing
Padroneggia Spring Cloud Gateway per i colloqui tecnici: 12 domande su routing, filtri, load balancing e pattern API Gateway con esempi di codice.

Logging in Spring Boot 2026: log strutturati in produzione con Logback e JSON
Guida completa al logging strutturato in Spring Boot. Configurazione Logback JSON, MDC per il tracing, best practice in produzione e integrazione con ELK Stack.

Colloquio Spring GraphQL: Resolver, DataLoader e Soluzioni al Problema N+1
Preparazione ai colloqui Spring GraphQL con questa guida completa. Resolver, DataLoader, gestione del problema N+1, mutation e migliori pratiche per le domande tecniche.