Spring Kafka: event-driven architectuur met resiliënte consumers
Volledige Spring Kafka-gids voor event-driven architecturen. Configuratie, resiliënte consumers, retry-strategieën, dead letter queues en productiepatronen voor gedistribueerde applicaties.

Apache Kafka heeft zich gevestigd als de de facto standaard voor grootschalige event-driven architecturen. Spring Kafka vereenvoudigt de integratie ervan in Spring Boot-applicaties en biedt tegelijkertijd essentiële resilience-mechanismen voor productieomgevingen. Deze gids verkent diepgaand de configuratie, consumptiepatronen en strategieën voor foutafhandeling.
Deze gids veronderstelt vertrouwdheid met de basisconcepten van Kafka: topics, partities, consumer groups en offsets. De focus ligt op de Spring-integratie en de resilience-patronen.
Waarom kiezen voor een event-driven architectuur?
Event-driven architecturen ontkoppelen de componenten van een systeem via asynchrone events. In tegenstelling tot synchrone REST-aanroepen sturen producers events zonder op antwoord te wachten, zodat consumers in hun eigen tempo kunnen verwerken.
Deze aanpak biedt verschillende kritieke voordelen: onafhankelijke horizontale schaalbaarheid per service, hogere resilience tegen tijdelijke storingen en volledige traceerbaarheid via het immutable log van Kafka.
public record OrderEvent(
// Unieke identifier van het event voor idempotentie
String eventId,
// Eventtype voor routing
String eventType,
// Tijdstempel van aanmaak
Instant createdAt,
// Business payload
OrderPayload payload
) {
// Factory method die de uniciteit van eventId garandeert
public static OrderEvent created(OrderPayload payload) {
return new OrderEvent(
UUID.randomUUID().toString(),
"ORDER_CREATED",
Instant.now(),
payload
);
}
}
public record OrderPayload(
Long orderId,
Long customerId,
List<OrderItem> items,
BigDecimal totalAmount
) {}De eventstructuur bevat systematisch een unieke identifier, een type en een tijdstempel. Deze metadata maken filtering aan de consumerkant, duplicaatdetectie en temporele tracking mogelijk.
Basisconfiguratie van Spring Kafka
De integratie begint met de spring-kafka-starter en een minimale YAML-configuratie. Spring Boot configureert automatisch de essentiële beans: KafkaTemplate om te produceren en ConcurrentKafkaListenerContainerFactory om te consumeren.
# application.yml
spring:
kafka:
# Adressen van de Kafka-brokers (cluster)
bootstrap-servers: localhost:9092
# Producer-configuratie
producer:
# Serialisatie van String-keys
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# JSON-serialisatie van waarden
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# Wachten op bevestiging van alle replicas
acks: all
# Aantal retries bij netwerkfouten
retries: 3
# Consumer-configuratie
consumer:
# Identificatie van de consumer group
group-id: order-service
# Deserialisatie van keys
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# JSON-deserialisatie met doeltype
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# Startpositie als er geen offset is geregistreerd
auto-offset-reset: earliest
# Auto-commit uitschakelen voor handmatige controle
enable-auto-commit: false
properties:
# Vertrouwd package voor deserialisatie
spring.json.trusted.packages: com.example.eventsenable-auto-commit uitschakelen is een essentiële productiepraktijk. Handmatige offset-commits garanderen dat een bericht pas als verwerkt wordt gemarkeerd nadat de werkelijke verwerking is voltooid.
Een Kafka-producer maken
De KafkaTemplate kapselt de verzendlogica naar Kafka in. Directe injectie maakt onmiddellijk gebruik binnen business services mogelijk.
@Service
@Slf4j
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
// Doel-topic (geëxternaliseerd in de configuratie)
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
OrderPayload payload) {
// Aanmaken van het event met metadata
OrderEvent event = OrderEvent.created(payload);
// orderId gebruiken als partition key
// Garandeert de volgorde van events voor dezelfde order
String partitionKey = payload.orderId().toString();
log.info("Publishing ORDER_CREATED event: {} to topic: {}",
event.eventId(), ordersTopic);
// Asynchrone verzending met callback
return kafkaTemplate.send(ordersTopic, partitionKey, event)
.whenComplete((result, ex) -> {
if (ex == null) {
// Succes: log de verzendmetadata
RecordMetadata metadata = result.getRecordMetadata();
log.info("Event sent successfully: topic={}, partition={}, offset={}",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
// Fout: log de fout voor onderzoek
log.error("Failed to send event: {}", event.eventId(), ex);
}
});
}
}Een partition key gebruiken die op de business identifier is gebaseerd, zorgt ervoor dat alle events voor dezelfde entiteit op dezelfde partitie terechtkomen, waardoor hun chronologische volgorde behouden blijft.
Een null-key verdeelt berichten round-robin over de partities. Dat maximaliseert het parallellisme maar verliest volgordegaranties. De keuze van de key hangt af van de business-eisen.
Basis-consumer met @KafkaListener
De @KafkaListener-annotatie maakt van een methode een Kafka-consumer. Spring beheert automatisch de polling-lus, deserialisatie en offset-commits.
@Service
@Slf4j
public class OrderEventConsumer {
private final OrderProcessingService processingService;
public OrderEventConsumer(OrderProcessingService processingService) {
this.processingService = processingService;
}
@KafkaListener(
// Topic(s) om naar te luisteren
topics = "${app.kafka.topics.orders}",
// Consumer group
groupId = "${spring.kafka.consumer.group-id}",
// Aangepaste factory voor geavanceerde configuratie
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
// Automatisch gedeserialiseerde payload
OrderEvent event,
// Geïnjecteerde Kafka-metadata
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
// Acknowledgment voor handmatige commit
Acknowledgment acknowledgment) {
log.info("Received event: type={}, partition={}, offset={}",
event.eventType(), partition, offset);
try {
// Business-verwerking
processingService.process(event);
// Offset pas committen na succesvolle verwerking
acknowledgment.acknowledge();
log.info("Event processed successfully: {}", event.eventId());
} catch (Exception ex) {
// Het ontbreken van acknowledge() veroorzaakt herverwerking
log.error("Failed to process event: {}", event.eventId(), ex);
throw ex;
}
}
}Acknowledgment injecteren maakt expliciete controle over het commit mogelijk. Zonder de aanroep van acknowledge() blijft de offset niet-gecommit en wordt het bericht bij de volgende poll opnieuw afgeleverd.
Geavanceerde ConsumerFactory-configuratie
De standaardconfiguratie volstaat voor ontwikkeling, maar vereist aanpassingen voor productie. Een aangepaste factory biedt fijnmazige controle over het consumer-gedrag.
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, OrderEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
// Verbindingsconfiguratie
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// Deserialisatie
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
// Offset-beheer
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Performance: records per poll
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// Sessie-timeout voor storingsdetectie
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// Heartbeat-interval (1/3 van de sessie-timeout)
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
// Maximale verwerkingstijd vóór rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// Configuratie van de JSON-deserializer
JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
deserializer.addTrustedPackages("com.example.events");
deserializer.setUseTypeMapperForKey(false);
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
deserializer
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Modus voor handmatige acknowledgment
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// Aantal consumer-threads
factory.setConcurrency(3);
// Batch-verwerking uitgeschakeld (één bericht per keer)
factory.setBatchListener(false);
return factory;
}
}De parameter MAX_POLL_INTERVAL_MS_CONFIG definieert de maximale vertraging tussen poll()-aanroepen. Deze overschrijden leidt tot het verwijderen van de consumer uit de groep en een rebalance. Deze waarde moet de maximale verwachte verwerkingstijd weerspiegelen.
Klaar om je Spring Boot gesprekken te halen?
Oefen met onze interactieve simulatoren, flashcards en technische tests.
Retry-strategieën met RetryTemplate
Tijdelijke fouten (kortstondige onbeschikbaarheid van een service, netwerk-timeouts) vereisen automatische retries. Spring Kafka integreert met RetryTemplate om geavanceerde retry-policy's te implementeren.
@Configuration
public class KafkaRetryConfig {
@Bean
public RetryTemplate kafkaRetryTemplate() {
RetryTemplate template = new RetryTemplate();
// Retry-policy: maximaal 3 pogingen
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
template.setRetryPolicy(retryPolicy);
// Exponentiële backoff: 1s, 2s, 4s tussen pogingen
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
template.setBackOffPolicy(backOffPolicy);
return template;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
retryableKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
RetryTemplate kafkaRetryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// Retry-configuratie met recovery-callback
factory.setCommonErrorHandler(
new DefaultErrorHandler(
// Recovery: actie nadat de pogingen zijn uitgeput
(record, exception) -> {
log.error("All retries exhausted for record: key={}, value={}",
record.key(), record.value(), exception);
},
// Exponentiële backoff: 1s initieel, max 30s, 3 pogingen
new ExponentialBackOff(1000L, 2.0)
)
);
return factory;
}
}De DefaultErrorHandler vervangt sinds Spring Kafka 2.8 de oude SeekToCurrentErrorHandler. Hij biedt een duidelijkere API en uitgebreide configuratiemogelijkheden.
Een Dead Letter Queue implementeren
Nadat de pogingen zijn uitgeput, moeten mislukte berichten worden gerouteerd naar een Dead Letter Queue (DLQ) voor latere analyse. Deze aanpak voorkomt gegevensverlies en deblokkeert tegelijk de consumer.
@Configuration
public class DeadLetterConfig {
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
KafkaTemplate<String, Object> kafkaTemplate) {
// Naamgevingsstrategie van het DLT-topic: original-topic.DLT
return new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, exception) -> {
// DLT-topic gebaseerd op het bron-topic
String dltTopic = record.topic() + ".DLT";
log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
dltTopic, record.key(), exception.getMessage());
return new TopicPartition(dltTopic, record.partition());
}
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
dltKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// ErrorHandler met DLT-recovery
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
deadLetterPublishingRecoverer,
new FixedBackOff(1000L, 3)
);
// Niet-herhaalbare excepties (directe verzending naar DLT)
errorHandler.addNotRetryableExceptions(
ValidationException.class,
JsonParseException.class,
NullPointerException.class
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}Onderscheid maken tussen herhaalbare en niet-herhaalbare excepties optimaliseert het gedrag. Een ValidationException duidt op slecht gevormde gegevens die niet door retries worden hersteld, wat directe routering naar de DLT rechtvaardigt.
DLT-consumer voor handmatige herverwerking
Een dedicated consumer bewaakt de DLT en maakt herverwerking van berichten mogelijk nadat het onderliggende probleem is opgelost.
@Service
@Slf4j
public class DeadLetterConsumer {
private final AlertingService alertingService;
private final FailedEventRepository failedEventRepository;
public DeadLetterConsumer(AlertingService alertingService,
FailedEventRepository failedEventRepository) {
this.alertingService = alertingService;
this.failedEventRepository = failedEventRepository;
}
@KafkaListener(
topics = "${app.kafka.topics.orders}.DLT",
groupId = "order-service-dlt"
)
public void handleDeadLetter(
ConsumerRecord<String, OrderEvent> record,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
@Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
@Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
Acknowledgment acknowledgment) {
log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
originalTopic, originalPartition, originalOffset, exceptionMessage);
// Persistentie voor analyse en latere herverwerking
FailedEvent failedEvent = FailedEvent.builder()
.eventId(record.value().eventId())
.eventType(record.value().eventType())
.payload(serializePayload(record.value()))
.originalTopic(originalTopic)
.originalPartition(originalPartition)
.originalOffset(originalOffset)
.exceptionMessage(exceptionMessage)
.stacktrace(stacktrace)
.status(FailedEventStatus.PENDING)
.createdAt(Instant.now())
.build();
failedEventRepository.save(failedEvent);
// Alert voor menselijke interventie
alertingService.notifyDeadLetter(failedEvent);
acknowledgment.acknowledge();
}
}Spring Kafka verrijkt DLT-berichten automatisch met headers die metadata over de fout bevatten: exceptie, stacktrace, oorspronkelijk topic, partitie en offset. Deze informatie vergemakkelijkt de diagnose.
Idempotentieafhandeling aan de consumerkant
Kafka garandeert "at least once"-aflevering: een bericht kan meerdere keren worden afgeleverd als er na verwerking, maar vóór commit, een crash optreedt. Idempotentie aan de consumerkant voorkomt neveneffecten van herverwerking.
@Service
@Slf4j
public class IdempotentOrderProcessor {
private final ProcessedEventRepository processedEventRepository;
private final OrderService orderService;
public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
OrderService orderService) {
this.processedEventRepository = processedEventRepository;
this.orderService = orderService;
}
@Transactional
public void processIdempotent(OrderEvent event) {
String eventId = event.eventId();
// Controle: event al verwerkt?
if (processedEventRepository.existsByEventId(eventId)) {
log.info("Event already processed, skipping: {}", eventId);
return;
}
// Business-verwerking
switch (event.eventType()) {
case "ORDER_CREATED" -> orderService.createOrder(event.payload());
case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
default -> log.warn("Unknown event type: {}", event.eventType());
}
// Verwerking vastleggen in dezelfde transactie
ProcessedEvent processed = ProcessedEvent.builder()
.eventId(eventId)
.eventType(event.eventType())
.processedAt(Instant.now())
.build();
processedEventRepository.save(processed);
log.info("Event processed and recorded: {}", eventId);
}
}@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {
// Index op eventId voor optimale prestaties
boolean existsByEventId(String eventId);
// Opschoning van oude records
@Modifying
@Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
int deleteOlderThan(@Param("before") Instant before);
}De verwerking vastleggen in dezelfde transactie als de business-logica garandeert atomiciteit. Een crash na de verwerking, maar vóór de commit, zal beide bewerkingen consistent opnieuw afspelen.
Transactional Outbox-patroon
Het Outbox-patroon lost consistentieproblemen tussen de database en Kafka op. In plaats van rechtstreeks naar Kafka te publiceren, worden events eerst gepersisteerd in een "outbox"-tabel en daarna door een dedicated proces doorgestuurd.
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String eventId;
@Column(nullable = false)
private String aggregateType;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String eventType;
@Column(columnDefinition = "TEXT", nullable = false)
private String payload;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private OutboxStatus status;
@Column(nullable = false)
private Instant createdAt;
private Instant publishedAt;
}
public enum OutboxStatus {
PENDING, PUBLISHED, FAILED
}@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
public OrderService(OrderRepository orderRepository,
OutboxRepository outboxRepository,
ObjectMapper objectMapper) {
this.orderRepository = orderRepository;
this.outboxRepository = outboxRepository;
this.objectMapper = objectMapper;
}
public Order createOrder(CreateOrderRequest request) {
// De order aanmaken
Order order = Order.builder()
.customerId(request.customerId())
.items(request.items())
.totalAmount(calculateTotal(request.items()))
.status(OrderStatus.CREATED)
.createdAt(Instant.now())
.build();
Order savedOrder = orderRepository.save(order);
// Outbox-event aanmaken in dezelfde transactie
OrderEvent event = OrderEvent.created(toPayload(savedOrder));
OutboxEvent outboxEvent = OutboxEvent.builder()
.eventId(event.eventId())
.aggregateType("Order")
.aggregateId(savedOrder.getId().toString())
.eventType(event.eventType())
.payload(serialize(event))
.status(OutboxStatus.PENDING)
.createdAt(Instant.now())
.build();
outboxRepository.save(outboxEvent);
return savedOrder;
}
private String serialize(OrderEvent event) {
try {
return objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
throw new SerializationException("Failed to serialize event", e);
}
}
}De outbox-relayer publiceert de events asynchroon naar Kafka:
@Component
@Slf4j
public class OutboxRelayScheduler {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OutboxRelayScheduler(OutboxRepository outboxRepository,
KafkaTemplate<String, String> kafkaTemplate) {
this.outboxRepository = outboxRepository;
this.kafkaTemplate = kafkaTemplate;
}
@Scheduled(fixedDelay = 1000)
@Transactional
public void relayPendingEvents() {
// Openstaande events met lock ophalen
List<OutboxEvent> pendingEvents = outboxRepository
.findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);
for (OutboxEvent event : pendingEvents) {
try {
// Naar Kafka publiceren
kafkaTemplate.send(
ordersTopic,
event.getAggregateId(),
event.getPayload()
).get(5, TimeUnit.SECONDS);
// Status bijwerken
event.setStatus(OutboxStatus.PUBLISHED);
event.setPublishedAt(Instant.now());
outboxRepository.save(event);
log.debug("Outbox event published: {}", event.getEventId());
} catch (Exception ex) {
log.error("Failed to publish outbox event: {}",
event.getEventId(), ex);
event.setStatus(OutboxStatus.FAILED);
outboxRepository.save(event);
}
}
}
}Dit patroon zorgt ervoor dat een event alleen wordt gepubliceerd als de business-transactie is geslaagd, waardoor inconsistenties tussen database en Kafka worden geëlimineerd.
Begin met oefenen!
Test je kennis met onze gespreksimulatoren en technische tests.
Monitoring en observability
Het bewaken van een Kafka-systeem vereist metrics over consumptielatentie, lag en fouten. Spring Boot Actuator exposeert deze metrics via Micrometer.
@Configuration
public class KafkaMetricsConfig {
@Bean
public MeterBinder kafkaConsumerMetrics(
ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
return registry -> {
// Aangepaste metric: verwerkte events
Counter.builder("kafka.consumer.events.processed")
.description("Number of events successfully processed")
.tag("topic", "orders")
.register(registry);
// Aangepaste metric: mislukte events
Counter.builder("kafka.consumer.events.failed")
.description("Number of events that failed processing")
.tag("topic", "orders")
.register(registry);
};
}
}@Service
@Slf4j
public class InstrumentedOrderConsumer {
private final OrderProcessingService processingService;
private final MeterRegistry meterRegistry;
private final Counter processedCounter;
private final Counter failedCounter;
private final Timer processingTimer;
public InstrumentedOrderConsumer(OrderProcessingService processingService,
MeterRegistry meterRegistry) {
this.processingService = processingService;
this.meterRegistry = meterRegistry;
// Counters initialiseren
this.processedCounter = Counter.builder("kafka.consumer.events.processed")
.tag("topic", "orders")
.register(meterRegistry);
this.failedCounter = Counter.builder("kafka.consumer.events.failed")
.tag("topic", "orders")
.register(meterRegistry);
// Timer om de verwerkingslatentie te meten
this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
.tag("topic", "orders")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
}
@KafkaListener(topics = "${app.kafka.topics.orders}")
public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
processingService.process(event);
acknowledgment.acknowledge();
processedCounter.increment();
sample.stop(processingTimer);
} catch (Exception ex) {
failedCounter.increment();
sample.stop(processingTimer);
throw ex;
}
}
}Met deze metrics kunnen alerts worden geconfigureerd op consumer-lag, foutpercentage en verwerkingslatentie, essentieel voor proactieve probleemdetectie.
Conclusie
Spring Kafka biedt een robuuste integratie om resiliënte event-driven architecturen te bouwen. Het beheersen van retry-mechanismen, dead letter queues en idempotentie vormt de basis van productieklare applicaties.
Checklist event-driven architectuur met Spring Kafka:
- ✅ Handmatige offset-commits configureren met
AckMode.MANUAL - ✅ Een consistente partition key gebruiken om de eventvolgorde te bewaren
- ✅ Retries met exponentiële backoff implementeren via
DefaultErrorHandler - ✅ Mislukte berichten naar een Dead Letter Queue routeren
- ✅ Idempotentie aan de consumerkant garanderen met eventId-tracking
- ✅ Het Outbox-patroon overwegen voor consistentie tussen database en Kafka
- ✅ Metrics exposeren via Micrometer voor monitoring
- ✅
MAX_POLL_INTERVAL_MSafstemmen op de maximale verwerkingstijd
Tags
Delen
Gerelateerde artikelen

Spring Cloud Gateway-sollicitatiegesprek: Routing, Filters en Load Balancing
Beheers Spring Cloud Gateway voor technische sollicitatiegesprekken: 12 vragen over routing, filters, load balancing en API Gateway-patronen met codevoorbeelden.

Spring Boot logging in 2026: gestructureerde logs in productie met Logback en JSON
Volledige gids voor gestructureerde logging in Spring Boot. Logback JSON-configuratie, MDC voor tracing, best practices in productie en integratie met ELK Stack.

Spring GraphQL Sollicitatiegesprek: Resolvers, DataLoaders en Oplossingen voor het N+1-probleem
Voorbereiding op Spring GraphQL sollicitatiegesprekken met deze complete gids. Resolvers, DataLoaders, omgaan met het N+1-probleem, mutaties en best practices voor technische vragen.