Spring Kafka: event-driven architectuur met resiliënte consumers

Volledige Spring Kafka-gids voor event-driven architecturen. Configuratie, resiliënte consumers, retry-strategieën, dead letter queues en productiepatronen voor gedistribueerde applicaties.

Event-driven architectuur met Spring Kafka en resiliënte consumers

Apache Kafka heeft zich gevestigd als de de facto standaard voor grootschalige event-driven architecturen. Spring Kafka vereenvoudigt de integratie ervan in Spring Boot-applicaties en biedt tegelijkertijd essentiële resilience-mechanismen voor productieomgevingen. Deze gids verkent diepgaand de configuratie, consumptiepatronen en strategieën voor foutafhandeling.

Vereisten

Deze gids veronderstelt vertrouwdheid met de basisconcepten van Kafka: topics, partities, consumer groups en offsets. De focus ligt op de Spring-integratie en de resilience-patronen.

Waarom kiezen voor een event-driven architectuur?

Event-driven architecturen ontkoppelen de componenten van een systeem via asynchrone events. In tegenstelling tot synchrone REST-aanroepen sturen producers events zonder op antwoord te wachten, zodat consumers in hun eigen tempo kunnen verwerken.

Deze aanpak biedt verschillende kritieke voordelen: onafhankelijke horizontale schaalbaarheid per service, hogere resilience tegen tijdelijke storingen en volledige traceerbaarheid via het immutable log van Kafka.

OrderEvent.javajava
public record OrderEvent(
    // Unieke identifier van het event voor idempotentie
    String eventId,
    // Eventtype voor routing
    String eventType,
    // Tijdstempel van aanmaak
    Instant createdAt,
    // Business payload
    OrderPayload payload
) {
    // Factory method die de uniciteit van eventId garandeert
    public static OrderEvent created(OrderPayload payload) {
        return new OrderEvent(
            UUID.randomUUID().toString(),
            "ORDER_CREATED",
            Instant.now(),
            payload
        );
    }
}

public record OrderPayload(
    Long orderId,
    Long customerId,
    List<OrderItem> items,
    BigDecimal totalAmount
) {}

De eventstructuur bevat systematisch een unieke identifier, een type en een tijdstempel. Deze metadata maken filtering aan de consumerkant, duplicaatdetectie en temporele tracking mogelijk.

Basisconfiguratie van Spring Kafka

De integratie begint met de spring-kafka-starter en een minimale YAML-configuratie. Spring Boot configureert automatisch de essentiële beans: KafkaTemplate om te produceren en ConcurrentKafkaListenerContainerFactory om te consumeren.

yaml
# application.yml
spring:
  kafka:
    # Adressen van de Kafka-brokers (cluster)
    bootstrap-servers: localhost:9092

    # Producer-configuratie
    producer:
      # Serialisatie van String-keys
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # JSON-serialisatie van waarden
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # Wachten op bevestiging van alle replicas
      acks: all
      # Aantal retries bij netwerkfouten
      retries: 3

    # Consumer-configuratie
    consumer:
      # Identificatie van de consumer group
      group-id: order-service
      # Deserialisatie van keys
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # JSON-deserialisatie met doeltype
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # Startpositie als er geen offset is geregistreerd
      auto-offset-reset: earliest
      # Auto-commit uitschakelen voor handmatige controle
      enable-auto-commit: false
      properties:
        # Vertrouwd package voor deserialisatie
        spring.json.trusted.packages: com.example.events

enable-auto-commit uitschakelen is een essentiële productiepraktijk. Handmatige offset-commits garanderen dat een bericht pas als verwerkt wordt gemarkeerd nadat de werkelijke verwerking is voltooid.

Een Kafka-producer maken

De KafkaTemplate kapselt de verzendlogica naar Kafka in. Directe injectie maakt onmiddellijk gebruik binnen business services mogelijk.

OrderEventPublisher.javajava
@Service
@Slf4j
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    // Doel-topic (geëxternaliseerd in de configuratie)
    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
            OrderPayload payload) {

        // Aanmaken van het event met metadata
        OrderEvent event = OrderEvent.created(payload);

        // orderId gebruiken als partition key
        // Garandeert de volgorde van events voor dezelfde order
        String partitionKey = payload.orderId().toString();

        log.info("Publishing ORDER_CREATED event: {} to topic: {}",
            event.eventId(), ordersTopic);

        // Asynchrone verzending met callback
        return kafkaTemplate.send(ordersTopic, partitionKey, event)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    // Succes: log de verzendmetadata
                    RecordMetadata metadata = result.getRecordMetadata();
                    log.info("Event sent successfully: topic={}, partition={}, offset={}",
                        metadata.topic(), metadata.partition(), metadata.offset());
                } else {
                    // Fout: log de fout voor onderzoek
                    log.error("Failed to send event: {}", event.eventId(), ex);
                }
            });
    }
}

Een partition key gebruiken die op de business identifier is gebaseerd, zorgt ervoor dat alle events voor dezelfde entiteit op dezelfde partitie terechtkomen, waardoor hun chronologische volgorde behouden blijft.

Partition key

Een null-key verdeelt berichten round-robin over de partities. Dat maximaliseert het parallellisme maar verliest volgordegaranties. De keuze van de key hangt af van de business-eisen.

Basis-consumer met @KafkaListener

De @KafkaListener-annotatie maakt van een methode een Kafka-consumer. Spring beheert automatisch de polling-lus, deserialisatie en offset-commits.

OrderEventConsumer.javajava
@Service
@Slf4j
public class OrderEventConsumer {

    private final OrderProcessingService processingService;

    public OrderEventConsumer(OrderProcessingService processingService) {
        this.processingService = processingService;
    }

    @KafkaListener(
        // Topic(s) om naar te luisteren
        topics = "${app.kafka.topics.orders}",
        // Consumer group
        groupId = "${spring.kafka.consumer.group-id}",
        // Aangepaste factory voor geavanceerde configuratie
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            // Automatisch gedeserialiseerde payload
            OrderEvent event,
            // Geïnjecteerde Kafka-metadata
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            // Acknowledgment voor handmatige commit
            Acknowledgment acknowledgment) {

        log.info("Received event: type={}, partition={}, offset={}",
            event.eventType(), partition, offset);

        try {
            // Business-verwerking
            processingService.process(event);

            // Offset pas committen na succesvolle verwerking
            acknowledgment.acknowledge();

            log.info("Event processed successfully: {}", event.eventId());

        } catch (Exception ex) {
            // Het ontbreken van acknowledge() veroorzaakt herverwerking
            log.error("Failed to process event: {}", event.eventId(), ex);
            throw ex;
        }
    }
}

Acknowledgment injecteren maakt expliciete controle over het commit mogelijk. Zonder de aanroep van acknowledge() blijft de offset niet-gecommit en wordt het bericht bij de volgende poll opnieuw afgeleverd.

Geavanceerde ConsumerFactory-configuratie

De standaardconfiguratie volstaat voor ontwikkeling, maar vereist aanpassingen voor productie. Een aangepaste factory biedt fijnmazige controle over het consumer-gedrag.

KafkaConsumerConfig.javajava
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        // Verbindingsconfiguratie
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // Deserialisatie
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);

        // Offset-beheer
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // Performance: records per poll
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        // Sessie-timeout voor storingsdetectie
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        // Heartbeat-interval (1/3 van de sessie-timeout)
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

        // Maximale verwerkingstijd vóór rebalance
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

        // Configuratie van de JSON-deserializer
        JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
        deserializer.addTrustedPackages("com.example.events");
        deserializer.setUseTypeMapperForKey(false);

        return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        // Modus voor handmatige acknowledgment
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Aantal consumer-threads
        factory.setConcurrency(3);

        // Batch-verwerking uitgeschakeld (één bericht per keer)
        factory.setBatchListener(false);

        return factory;
    }
}

De parameter MAX_POLL_INTERVAL_MS_CONFIG definieert de maximale vertraging tussen poll()-aanroepen. Deze overschrijden leidt tot het verwijderen van de consumer uit de groep en een rebalance. Deze waarde moet de maximale verwachte verwerkingstijd weerspiegelen.

Klaar om je Spring Boot gesprekken te halen?

Oefen met onze interactieve simulatoren, flashcards en technische tests.

Retry-strategieën met RetryTemplate

Tijdelijke fouten (kortstondige onbeschikbaarheid van een service, netwerk-timeouts) vereisen automatische retries. Spring Kafka integreert met RetryTemplate om geavanceerde retry-policy's te implementeren.

KafkaRetryConfig.javajava
@Configuration
public class KafkaRetryConfig {

    @Bean
    public RetryTemplate kafkaRetryTemplate() {
        RetryTemplate template = new RetryTemplate();

        // Retry-policy: maximaal 3 pogingen
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        template.setRetryPolicy(retryPolicy);

        // Exponentiële backoff: 1s, 2s, 4s tussen pogingen
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        template.setBackOffPolicy(backOffPolicy);

        return template;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            retryableKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                RetryTemplate kafkaRetryTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Retry-configuratie met recovery-callback
        factory.setCommonErrorHandler(
            new DefaultErrorHandler(
                // Recovery: actie nadat de pogingen zijn uitgeput
                (record, exception) -> {
                    log.error("All retries exhausted for record: key={}, value={}",
                        record.key(), record.value(), exception);
                },
                // Exponentiële backoff: 1s initieel, max 30s, 3 pogingen
                new ExponentialBackOff(1000L, 2.0)
            )
        );

        return factory;
    }
}

De DefaultErrorHandler vervangt sinds Spring Kafka 2.8 de oude SeekToCurrentErrorHandler. Hij biedt een duidelijkere API en uitgebreide configuratiemogelijkheden.

Een Dead Letter Queue implementeren

Nadat de pogingen zijn uitgeput, moeten mislukte berichten worden gerouteerd naar een Dead Letter Queue (DLQ) voor latere analyse. Deze aanpak voorkomt gegevensverlies en deblokkeert tegelijk de consumer.

DeadLetterConfig.javajava
@Configuration
public class DeadLetterConfig {

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // Naamgevingsstrategie van het DLT-topic: original-topic.DLT
        return new DeadLetterPublishingRecoverer(
            kafkaTemplate,
            (record, exception) -> {
                // DLT-topic gebaseerd op het bron-topic
                String dltTopic = record.topic() + ".DLT";

                log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
                    dltTopic, record.key(), exception.getMessage());

                return new TopicPartition(dltTopic, record.partition());
            }
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            dltKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // ErrorHandler met DLT-recovery
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            deadLetterPublishingRecoverer,
            new FixedBackOff(1000L, 3)
        );

        // Niet-herhaalbare excepties (directe verzending naar DLT)
        errorHandler.addNotRetryableExceptions(
            ValidationException.class,
            JsonParseException.class,
            NullPointerException.class
        );

        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }
}

Onderscheid maken tussen herhaalbare en niet-herhaalbare excepties optimaliseert het gedrag. Een ValidationException duidt op slecht gevormde gegevens die niet door retries worden hersteld, wat directe routering naar de DLT rechtvaardigt.

DLT-consumer voor handmatige herverwerking

Een dedicated consumer bewaakt de DLT en maakt herverwerking van berichten mogelijk nadat het onderliggende probleem is opgelost.

DeadLetterConsumer.javajava
@Service
@Slf4j
public class DeadLetterConsumer {

    private final AlertingService alertingService;
    private final FailedEventRepository failedEventRepository;

    public DeadLetterConsumer(AlertingService alertingService,
                              FailedEventRepository failedEventRepository) {
        this.alertingService = alertingService;
        this.failedEventRepository = failedEventRepository;
    }

    @KafkaListener(
        topics = "${app.kafka.topics.orders}.DLT",
        groupId = "order-service-dlt"
    )
    public void handleDeadLetter(
            ConsumerRecord<String, OrderEvent> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
            @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
            @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
            Acknowledgment acknowledgment) {

        log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
            originalTopic, originalPartition, originalOffset, exceptionMessage);

        // Persistentie voor analyse en latere herverwerking
        FailedEvent failedEvent = FailedEvent.builder()
            .eventId(record.value().eventId())
            .eventType(record.value().eventType())
            .payload(serializePayload(record.value()))
            .originalTopic(originalTopic)
            .originalPartition(originalPartition)
            .originalOffset(originalOffset)
            .exceptionMessage(exceptionMessage)
            .stacktrace(stacktrace)
            .status(FailedEventStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        failedEventRepository.save(failedEvent);

        // Alert voor menselijke interventie
        alertingService.notifyDeadLetter(failedEvent);

        acknowledgment.acknowledge();
    }
}
DLT-headers

Spring Kafka verrijkt DLT-berichten automatisch met headers die metadata over de fout bevatten: exceptie, stacktrace, oorspronkelijk topic, partitie en offset. Deze informatie vergemakkelijkt de diagnose.

Idempotentieafhandeling aan de consumerkant

Kafka garandeert "at least once"-aflevering: een bericht kan meerdere keren worden afgeleverd als er na verwerking, maar vóór commit, een crash optreedt. Idempotentie aan de consumerkant voorkomt neveneffecten van herverwerking.

IdempotentOrderProcessor.javajava
@Service
@Slf4j
public class IdempotentOrderProcessor {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderService orderService;

    public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
                                    OrderService orderService) {
        this.processedEventRepository = processedEventRepository;
        this.orderService = orderService;
    }

    @Transactional
    public void processIdempotent(OrderEvent event) {
        String eventId = event.eventId();

        // Controle: event al verwerkt?
        if (processedEventRepository.existsByEventId(eventId)) {
            log.info("Event already processed, skipping: {}", eventId);
            return;
        }

        // Business-verwerking
        switch (event.eventType()) {
            case "ORDER_CREATED" -> orderService.createOrder(event.payload());
            case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
            case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
            default -> log.warn("Unknown event type: {}", event.eventType());
        }

        // Verwerking vastleggen in dezelfde transactie
        ProcessedEvent processed = ProcessedEvent.builder()
            .eventId(eventId)
            .eventType(event.eventType())
            .processedAt(Instant.now())
            .build();

        processedEventRepository.save(processed);

        log.info("Event processed and recorded: {}", eventId);
    }
}
ProcessedEventRepository.javajava
@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {

    // Index op eventId voor optimale prestaties
    boolean existsByEventId(String eventId);

    // Opschoning van oude records
    @Modifying
    @Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
    int deleteOlderThan(@Param("before") Instant before);
}

De verwerking vastleggen in dezelfde transactie als de business-logica garandeert atomiciteit. Een crash na de verwerking, maar vóór de commit, zal beide bewerkingen consistent opnieuw afspelen.

Transactional Outbox-patroon

Het Outbox-patroon lost consistentieproblemen tussen de database en Kafka op. In plaats van rechtstreeks naar Kafka te publiceren, worden events eerst gepersisteerd in een "outbox"-tabel en daarna door een dedicated proces doorgestuurd.

OutboxEvent.javajava
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String eventId;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String eventType;

    @Column(columnDefinition = "TEXT", nullable = false)
    private String payload;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private OutboxStatus status;

    @Column(nullable = false)
    private Instant createdAt;

    private Instant publishedAt;
}

public enum OutboxStatus {
    PENDING, PUBLISHED, FAILED
}
OrderService.javajava
@Service
@Transactional
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    public OrderService(OrderRepository orderRepository,
                       OutboxRepository outboxRepository,
                       ObjectMapper objectMapper) {
        this.orderRepository = orderRepository;
        this.outboxRepository = outboxRepository;
        this.objectMapper = objectMapper;
    }

    public Order createOrder(CreateOrderRequest request) {
        // De order aanmaken
        Order order = Order.builder()
            .customerId(request.customerId())
            .items(request.items())
            .totalAmount(calculateTotal(request.items()))
            .status(OrderStatus.CREATED)
            .createdAt(Instant.now())
            .build();

        Order savedOrder = orderRepository.save(order);

        // Outbox-event aanmaken in dezelfde transactie
        OrderEvent event = OrderEvent.created(toPayload(savedOrder));

        OutboxEvent outboxEvent = OutboxEvent.builder()
            .eventId(event.eventId())
            .aggregateType("Order")
            .aggregateId(savedOrder.getId().toString())
            .eventType(event.eventType())
            .payload(serialize(event))
            .status(OutboxStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        outboxRepository.save(outboxEvent);

        return savedOrder;
    }

    private String serialize(OrderEvent event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Failed to serialize event", e);
        }
    }
}

De outbox-relayer publiceert de events asynchroon naar Kafka:

OutboxRelayScheduler.javajava
@Component
@Slf4j
public class OutboxRelayScheduler {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OutboxRelayScheduler(OutboxRepository outboxRepository,
                               KafkaTemplate<String, String> kafkaTemplate) {
        this.outboxRepository = outboxRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void relayPendingEvents() {
        // Openstaande events met lock ophalen
        List<OutboxEvent> pendingEvents = outboxRepository
            .findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);

        for (OutboxEvent event : pendingEvents) {
            try {
                // Naar Kafka publiceren
                kafkaTemplate.send(
                    ordersTopic,
                    event.getAggregateId(),
                    event.getPayload()
                ).get(5, TimeUnit.SECONDS);

                // Status bijwerken
                event.setStatus(OutboxStatus.PUBLISHED);
                event.setPublishedAt(Instant.now());
                outboxRepository.save(event);

                log.debug("Outbox event published: {}", event.getEventId());

            } catch (Exception ex) {
                log.error("Failed to publish outbox event: {}",
                    event.getEventId(), ex);
                event.setStatus(OutboxStatus.FAILED);
                outboxRepository.save(event);
            }
        }
    }
}

Dit patroon zorgt ervoor dat een event alleen wordt gepubliceerd als de business-transactie is geslaagd, waardoor inconsistenties tussen database en Kafka worden geëlimineerd.

Begin met oefenen!

Test je kennis met onze gespreksimulatoren en technische tests.

Monitoring en observability

Het bewaken van een Kafka-systeem vereist metrics over consumptielatentie, lag en fouten. Spring Boot Actuator exposeert deze metrics via Micrometer.

KafkaMetricsConfig.javajava
@Configuration
public class KafkaMetricsConfig {

    @Bean
    public MeterBinder kafkaConsumerMetrics(
            ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        return registry -> {
            // Aangepaste metric: verwerkte events
            Counter.builder("kafka.consumer.events.processed")
                .description("Number of events successfully processed")
                .tag("topic", "orders")
                .register(registry);

            // Aangepaste metric: mislukte events
            Counter.builder("kafka.consumer.events.failed")
                .description("Number of events that failed processing")
                .tag("topic", "orders")
                .register(registry);
        };
    }
}
InstrumentedOrderConsumer.javajava
@Service
@Slf4j
public class InstrumentedOrderConsumer {

    private final OrderProcessingService processingService;
    private final MeterRegistry meterRegistry;
    private final Counter processedCounter;
    private final Counter failedCounter;
    private final Timer processingTimer;

    public InstrumentedOrderConsumer(OrderProcessingService processingService,
                                     MeterRegistry meterRegistry) {
        this.processingService = processingService;
        this.meterRegistry = meterRegistry;

        // Counters initialiseren
        this.processedCounter = Counter.builder("kafka.consumer.events.processed")
            .tag("topic", "orders")
            .register(meterRegistry);

        this.failedCounter = Counter.builder("kafka.consumer.events.failed")
            .tag("topic", "orders")
            .register(meterRegistry);

        // Timer om de verwerkingslatentie te meten
        this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
            .tag("topic", "orders")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(meterRegistry);
    }

    @KafkaListener(topics = "${app.kafka.topics.orders}")
    public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {

        Timer.Sample sample = Timer.start(meterRegistry);

        try {
            processingService.process(event);
            acknowledgment.acknowledge();

            processedCounter.increment();
            sample.stop(processingTimer);

        } catch (Exception ex) {
            failedCounter.increment();
            sample.stop(processingTimer);
            throw ex;
        }
    }
}

Met deze metrics kunnen alerts worden geconfigureerd op consumer-lag, foutpercentage en verwerkingslatentie, essentieel voor proactieve probleemdetectie.

Conclusie

Spring Kafka biedt een robuuste integratie om resiliënte event-driven architecturen te bouwen. Het beheersen van retry-mechanismen, dead letter queues en idempotentie vormt de basis van productieklare applicaties.

Checklist event-driven architectuur met Spring Kafka:

  • ✅ Handmatige offset-commits configureren met AckMode.MANUAL
  • ✅ Een consistente partition key gebruiken om de eventvolgorde te bewaren
  • ✅ Retries met exponentiële backoff implementeren via DefaultErrorHandler
  • ✅ Mislukte berichten naar een Dead Letter Queue routeren
  • ✅ Idempotentie aan de consumerkant garanderen met eventId-tracking
  • ✅ Het Outbox-patroon overwegen voor consistentie tussen database en Kafka
  • ✅ Metrics exposeren via Micrometer voor monitoring
  • MAX_POLL_INTERVAL_MS afstemmen op de maximale verwerkingstijd

Tags

#spring kafka
#event-driven
#kafka consumer
#microservices
#resilience

Delen

Gerelateerde artikelen