Spring Kafka : architecture event-driven avec consumers résilients

Guide complet Spring Kafka pour architectures event-driven. Configuration, consumers résilients, retry policies, dead letter queues et patterns de production pour applications distribuées.

Architecture event-driven avec Spring Kafka et consumers résilients

Apache Kafka s'impose comme le standard de facto pour les architectures event-driven à grande échelle. Spring Kafka simplifie son intégration dans les applications Spring Boot tout en offrant des mécanismes de résilience essentiels pour la production. Ce guide explore en profondeur la configuration, les patterns de consommation et les stratégies de gestion d'erreurs.

Prérequis

Ce guide suppose une familiarité avec les concepts de base Kafka : topics, partitions, consumer groups et offsets. L'accent porte sur l'intégration Spring et les patterns de résilience.

Pourquoi choisir une architecture event-driven ?

Les architectures event-driven découplent les composants d'un système en utilisant des événements asynchrones. Contrairement aux appels synchrones REST, les producteurs émettent des événements sans attendre de réponse, permettant aux consommateurs de traiter à leur propre rythme.

Cette approche apporte plusieurs avantages critiques : scalabilité horizontale indépendante de chaque service, résilience accrue face aux pannes temporaires, et traçabilité complète via le log immutable de Kafka.

OrderEvent.javajava
public record OrderEvent(
    // Identifiant unique de l'événement pour idempotence
    String eventId,
    // Type d'événement pour le routage
    String eventType,
    // Timestamp de création
    Instant createdAt,
    // Payload métier
    OrderPayload payload
) {
    // Factory method garantissant l'unicité de l'eventId
    public static OrderEvent created(OrderPayload payload) {
        return new OrderEvent(
            UUID.randomUUID().toString(),
            "ORDER_CREATED",
            Instant.now(),
            payload
        );
    }
}

public record OrderPayload(
    Long orderId,
    Long customerId,
    List<OrderItem> items,
    BigDecimal totalAmount
) {}

La structure d'événement inclut systématiquement un identifiant unique, un type et un timestamp. Ces métadonnées permettent le filtrage côté consommateur, la détection des doublons et le suivi temporel.

Configuration de base Spring Kafka

L'intégration démarre avec le starter spring-kafka et une configuration YAML minimale. Spring Boot auto-configure les beans essentiels : KafkaTemplate pour la production et ConcurrentKafkaListenerContainerFactory pour la consommation.

yaml
# application.yml
spring:
  kafka:
    # Adresses des brokers Kafka (cluster)
    bootstrap-servers: localhost:9092

    # Configuration du producteur
    producer:
      # Sérialisation des clés en String
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Sérialisation des valeurs en JSON
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # Attendre confirmation de tous les replicas
      acks: all
      # Nombre de retries en cas d'échec réseau
      retries: 3

    # Configuration du consommateur
    consumer:
      # Identifiant du groupe de consommateurs
      group-id: order-service
      # Désérialisation des clés
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Désérialisation JSON avec type cible
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # Position de départ si aucun offset enregistré
      auto-offset-reset: earliest
      # Désactivation du commit automatique pour contrôle manuel
      enable-auto-commit: false
      properties:
        # Package trusted pour la désérialisation
        spring.json.trusted.packages: com.example.events

La désactivation de enable-auto-commit constitue une pratique essentielle en production. Le commit manuel des offsets garantit qu'un message n'est marqué comme traité qu'après son traitement effectif.

Création d'un producteur Kafka

Le KafkaTemplate encapsule la logique d'envoi vers Kafka. L'injection directe permet une utilisation immédiate dans les services métier.

OrderEventPublisher.javajava
@Service
@Slf4j
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    // Topic de destination (externalisé dans la config)
    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
            OrderPayload payload) {

        // Création de l'événement avec métadonnées
        OrderEvent event = OrderEvent.created(payload);

        // Utilisation de l'orderId comme clé de partitionnement
        // Garantit l'ordre des événements pour une même commande
        String partitionKey = payload.orderId().toString();

        log.info("Publishing ORDER_CREATED event: {} to topic: {}",
            event.eventId(), ordersTopic);

        // Envoi asynchrone avec callback
        return kafkaTemplate.send(ordersTopic, partitionKey, event)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    // Succès : log des métadonnées de l'envoi
                    RecordMetadata metadata = result.getRecordMetadata();
                    log.info("Event sent successfully: topic={}, partition={}, offset={}",
                        metadata.topic(), metadata.partition(), metadata.offset());
                } else {
                    // Échec : log de l'erreur pour investigation
                    log.error("Failed to send event: {}", event.eventId(), ex);
                }
            });
    }
}

L'utilisation d'une clé de partitionnement basée sur l'identifiant métier garantit que tous les événements d'une même entité arrivent dans la même partition, préservant ainsi leur ordre chronologique.

Clé de partitionnement

Une clé null distribue les messages de manière round-robin entre les partitions. Cela maximise le parallélisme mais perd la garantie d'ordre. Le choix de la clé dépend des exigences métier.

Consumer basique avec @KafkaListener

L'annotation @KafkaListener transforme une méthode en consommateur Kafka. Spring gère automatiquement la boucle de polling, la désérialisation et le commit des offsets.

OrderEventConsumer.javajava
@Service
@Slf4j
public class OrderEventConsumer {

    private final OrderProcessingService processingService;

    public OrderEventConsumer(OrderProcessingService processingService) {
        this.processingService = processingService;
    }

    @KafkaListener(
        // Topic(s) à écouter
        topics = "${app.kafka.topics.orders}",
        // Groupe de consommateurs
        groupId = "${spring.kafka.consumer.group-id}",
        // Factory personnalisée pour configuration avancée
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            // Payload désérialisé automatiquement
            OrderEvent event,
            // Métadonnées Kafka injectées
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            // Acknowledgment pour commit manuel
            Acknowledgment acknowledgment) {

        log.info("Received event: type={}, partition={}, offset={}",
            event.eventType(), partition, offset);

        try {
            // Traitement métier
            processingService.process(event);

            // Commit de l'offset uniquement après traitement réussi
            acknowledgment.acknowledge();

            log.info("Event processed successfully: {}", event.eventId());

        } catch (Exception ex) {
            // L'absence de acknowledge() provoque un retraitement
            log.error("Failed to process event: {}", event.eventId(), ex);
            throw ex;
        }
    }
}

L'injection de Acknowledgment permet le contrôle explicite du commit. Sans appel à acknowledge(), l'offset n'est pas commité et le message sera redistribué au prochain poll.

Configuration avancée du ConsumerFactory

La configuration par défaut convient au développement mais nécessite des ajustements pour la production. Une factory personnalisée permet de contrôler finement le comportement des consommateurs.

KafkaConsumerConfig.javajava
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        // Configuration de connexion
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // Désérialisation
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);

        // Gestion des offsets
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // Performance : nombre de records par poll
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        // Timeout de session pour détection de pannes
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        // Intervalle de heartbeat (1/3 du session timeout)
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

        // Timeout max de traitement avant rebalance
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

        // Configuration du désérialiseur JSON
        JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
        deserializer.addTrustedPackages("com.example.events");
        deserializer.setUseTypeMapperForKey(false);

        return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        // Mode d'acknowledgment manuel
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Nombre de threads consommateurs
        factory.setConcurrency(3);

        // Traitement par batch désactivé (un message à la fois)
        factory.setBatchListener(false);

        return factory;
    }
}

Le paramètre MAX_POLL_INTERVAL_MS_CONFIG définit le délai maximum entre deux appels à poll(). Un dépassement provoque l'exclusion du consommateur du groupe et un rebalance. Cette valeur doit refléter le temps de traitement maximum attendu.

Prêt à réussir tes entretiens Spring Boot ?

Entraîne-toi avec nos simulateurs interactifs, fiches express et tests techniques.

Stratégies de retry avec RetryTemplate

Les erreurs transitoires (indisponibilité temporaire d'un service, timeout réseau) nécessitent des retries automatiques. Spring Kafka s'intègre avec RetryTemplate pour implémenter des politiques de retry sophistiquées.

KafkaRetryConfig.javajava
@Configuration
public class KafkaRetryConfig {

    @Bean
    public RetryTemplate kafkaRetryTemplate() {
        RetryTemplate template = new RetryTemplate();

        // Politique de retry : 3 tentatives maximum
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        template.setRetryPolicy(retryPolicy);

        // Backoff exponentiel : 1s, 2s, 4s entre les tentatives
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        template.setBackOffPolicy(backOffPolicy);

        return template;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            retryableKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                RetryTemplate kafkaRetryTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Configuration du retry avec recovery callback
        factory.setCommonErrorHandler(
            new DefaultErrorHandler(
                // Recovery : action après épuisement des retries
                (record, exception) -> {
                    log.error("All retries exhausted for record: key={}, value={}",
                        record.key(), record.value(), exception);
                },
                // Backoff exponentiel : 1s initial, max 30s, 3 tentatives
                new ExponentialBackOff(1000L, 2.0)
            )
        );

        return factory;
    }
}

Le DefaultErrorHandler remplace l'ancien SeekToCurrentErrorHandler depuis Spring Kafka 2.8. Il offre une API plus claire et des options de configuration étendues.

Implémentation d'une Dead Letter Queue

Après épuisement des retries, les messages en échec doivent être routés vers une Dead Letter Queue (DLQ) pour analyse ultérieure. Cette approche évite la perte de données tout en débloquant le consommateur.

DeadLetterConfig.javajava
@Configuration
public class DeadLetterConfig {

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // Stratégie de nommage du topic DLT : topic-original.DLT
        return new DeadLetterPublishingRecoverer(
            kafkaTemplate,
            (record, exception) -> {
                // Topic DLT basé sur le topic source
                String dltTopic = record.topic() + ".DLT";

                log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
                    dltTopic, record.key(), exception.getMessage());

                return new TopicPartition(dltTopic, record.partition());
            }
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            dltKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // ErrorHandler avec DLT recovery
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            deadLetterPublishingRecoverer,
            new FixedBackOff(1000L, 3)
        );

        // Exceptions non-retryables (envoi direct en DLT)
        errorHandler.addNotRetryableExceptions(
            ValidationException.class,
            JsonParseException.class,
            NullPointerException.class
        );

        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }
}

La distinction entre exceptions retryables et non-retryables optimise le comportement. Une ValidationException indique une donnée malformée qui ne sera pas corrigée par des retries, justifiant un envoi direct en DLT.

Consumer DLT pour retraitement manuel

Un consommateur dédié surveille la DLT et permet le retraitement des messages après correction du problème sous-jacent.

DeadLetterConsumer.javajava
@Service
@Slf4j
public class DeadLetterConsumer {

    private final AlertingService alertingService;
    private final FailedEventRepository failedEventRepository;

    public DeadLetterConsumer(AlertingService alertingService,
                              FailedEventRepository failedEventRepository) {
        this.alertingService = alertingService;
        this.failedEventRepository = failedEventRepository;
    }

    @KafkaListener(
        topics = "${app.kafka.topics.orders}.DLT",
        groupId = "order-service-dlt"
    )
    public void handleDeadLetter(
            ConsumerRecord<String, OrderEvent> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
            @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
            @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
            Acknowledgment acknowledgment) {

        log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
            originalTopic, originalPartition, originalOffset, exceptionMessage);

        // Persistance pour analyse et retraitement ultérieur
        FailedEvent failedEvent = FailedEvent.builder()
            .eventId(record.value().eventId())
            .eventType(record.value().eventType())
            .payload(serializePayload(record.value()))
            .originalTopic(originalTopic)
            .originalPartition(originalPartition)
            .originalOffset(originalOffset)
            .exceptionMessage(exceptionMessage)
            .stacktrace(stacktrace)
            .status(FailedEventStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        failedEventRepository.save(failedEvent);

        // Alerte pour intervention humaine
        alertingService.notifyDeadLetter(failedEvent);

        acknowledgment.acknowledge();
    }
}
Headers DLT

Spring Kafka enrichit automatiquement les messages DLT avec des headers contenant les métadonnées de l'échec : exception, stacktrace, topic d'origine, partition et offset. Ces informations facilitent le diagnostic.

Gestion de l'idempotence côté consommateur

Kafka garantit la livraison "at least once" : un message peut être délivré plusieurs fois en cas de crash après traitement mais avant commit. L'idempotence côté consommateur évite les effets de bord des retraitements.

IdempotentOrderProcessor.javajava
@Service
@Slf4j
public class IdempotentOrderProcessor {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderService orderService;

    public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
                                    OrderService orderService) {
        this.processedEventRepository = processedEventRepository;
        this.orderService = orderService;
    }

    @Transactional
    public void processIdempotent(OrderEvent event) {
        String eventId = event.eventId();

        // Vérification : événement déjà traité ?
        if (processedEventRepository.existsByEventId(eventId)) {
            log.info("Event already processed, skipping: {}", eventId);
            return;
        }

        // Traitement métier
        switch (event.eventType()) {
            case "ORDER_CREATED" -> orderService.createOrder(event.payload());
            case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
            case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
            default -> log.warn("Unknown event type: {}", event.eventType());
        }

        // Enregistrement du traitement dans la même transaction
        ProcessedEvent processed = ProcessedEvent.builder()
            .eventId(eventId)
            .eventType(event.eventType())
            .processedAt(Instant.now())
            .build();

        processedEventRepository.save(processed);

        log.info("Event processed and recorded: {}", eventId);
    }
}
ProcessedEventRepository.javajava
@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {

    // Index sur eventId pour performances optimales
    boolean existsByEventId(String eventId);

    // Nettoyage des anciens enregistrements
    @Modifying
    @Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
    int deleteOlderThan(@Param("before") Instant before);
}

L'enregistrement du traitement dans la même transaction que le traitement métier garantit l'atomicité. Un crash après traitement mais avant commit rejouera les deux opérations de manière cohérente.

Pattern Transactional Outbox

Le pattern Outbox résout le problème de la cohérence entre la base de données et Kafka. Au lieu de publier directement vers Kafka, les événements sont d'abord persistés dans une table "outbox" puis relayés par un processus dédié.

OutboxEvent.javajava
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String eventId;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String eventType;

    @Column(columnDefinition = "TEXT", nullable = false)
    private String payload;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private OutboxStatus status;

    @Column(nullable = false)
    private Instant createdAt;

    private Instant publishedAt;
}

public enum OutboxStatus {
    PENDING, PUBLISHED, FAILED
}
OrderService.javajava
@Service
@Transactional
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    public OrderService(OrderRepository orderRepository,
                       OutboxRepository outboxRepository,
                       ObjectMapper objectMapper) {
        this.orderRepository = orderRepository;
        this.outboxRepository = outboxRepository;
        this.objectMapper = objectMapper;
    }

    public Order createOrder(CreateOrderRequest request) {
        // Création de la commande
        Order order = Order.builder()
            .customerId(request.customerId())
            .items(request.items())
            .totalAmount(calculateTotal(request.items()))
            .status(OrderStatus.CREATED)
            .createdAt(Instant.now())
            .build();

        Order savedOrder = orderRepository.save(order);

        // Création de l'événement outbox dans la même transaction
        OrderEvent event = OrderEvent.created(toPayload(savedOrder));

        OutboxEvent outboxEvent = OutboxEvent.builder()
            .eventId(event.eventId())
            .aggregateType("Order")
            .aggregateId(savedOrder.getId().toString())
            .eventType(event.eventType())
            .payload(serialize(event))
            .status(OutboxStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        outboxRepository.save(outboxEvent);

        return savedOrder;
    }

    private String serialize(OrderEvent event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Failed to serialize event", e);
        }
    }
}

Le relayeur outbox publie les événements vers Kafka de manière asynchrone :

OutboxRelayScheduler.javajava
@Component
@Slf4j
public class OutboxRelayScheduler {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OutboxRelayScheduler(OutboxRepository outboxRepository,
                               KafkaTemplate<String, String> kafkaTemplate) {
        this.outboxRepository = outboxRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void relayPendingEvents() {
        // Récupération des événements en attente avec verrou
        List<OutboxEvent> pendingEvents = outboxRepository
            .findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);

        for (OutboxEvent event : pendingEvents) {
            try {
                // Publication vers Kafka
                kafkaTemplate.send(
                    ordersTopic,
                    event.getAggregateId(),
                    event.getPayload()
                ).get(5, TimeUnit.SECONDS);

                // Mise à jour du statut
                event.setStatus(OutboxStatus.PUBLISHED);
                event.setPublishedAt(Instant.now());
                outboxRepository.save(event);

                log.debug("Outbox event published: {}", event.getEventId());

            } catch (Exception ex) {
                log.error("Failed to publish outbox event: {}",
                    event.getEventId(), ex);
                event.setStatus(OutboxStatus.FAILED);
                outboxRepository.save(event);
            }
        }
    }
}

Ce pattern garantit qu'un événement n'est publié que si la transaction métier a réussi, éliminant les incohérences entre la base et Kafka.

Passe à la pratique !

Teste tes connaissances avec nos simulateurs d'entretien et tests techniques.

Monitoring et observabilité

La supervision d'un système Kafka nécessite des métriques sur la latence de consommation, le lag et les erreurs. Spring Boot Actuator expose ces métriques via Micrometer.

KafkaMetricsConfig.javajava
@Configuration
public class KafkaMetricsConfig {

    @Bean
    public MeterBinder kafkaConsumerMetrics(
            ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        return registry -> {
            // Métrique personnalisée : événements traités
            Counter.builder("kafka.consumer.events.processed")
                .description("Number of events successfully processed")
                .tag("topic", "orders")
                .register(registry);

            // Métrique personnalisée : événements en erreur
            Counter.builder("kafka.consumer.events.failed")
                .description("Number of events that failed processing")
                .tag("topic", "orders")
                .register(registry);
        };
    }
}
InstrumentedOrderConsumer.javajava
@Service
@Slf4j
public class InstrumentedOrderConsumer {

    private final OrderProcessingService processingService;
    private final MeterRegistry meterRegistry;
    private final Counter processedCounter;
    private final Counter failedCounter;
    private final Timer processingTimer;

    public InstrumentedOrderConsumer(OrderProcessingService processingService,
                                     MeterRegistry meterRegistry) {
        this.processingService = processingService;
        this.meterRegistry = meterRegistry;

        // Initialisation des compteurs
        this.processedCounter = Counter.builder("kafka.consumer.events.processed")
            .tag("topic", "orders")
            .register(meterRegistry);

        this.failedCounter = Counter.builder("kafka.consumer.events.failed")
            .tag("topic", "orders")
            .register(meterRegistry);

        // Timer pour mesurer la latence de traitement
        this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
            .tag("topic", "orders")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(meterRegistry);
    }

    @KafkaListener(topics = "${app.kafka.topics.orders}")
    public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {

        Timer.Sample sample = Timer.start(meterRegistry);

        try {
            processingService.process(event);
            acknowledgment.acknowledge();

            processedCounter.increment();
            sample.stop(processingTimer);

        } catch (Exception ex) {
            failedCounter.increment();
            sample.stop(processingTimer);
            throw ex;
        }
    }
}

Ces métriques permettent de configurer des alertes sur le lag consommateur, le taux d'erreur et la latence de traitement, essentiels pour la détection proactive des problèmes.

Conclusion

Spring Kafka offre une intégration robuste pour construire des architectures event-driven résilientes. La maîtrise des mécanismes de retry, dead letter queues et idempotence constitue le socle d'une application production-ready.

Checklist architecture event-driven avec Spring Kafka :

  • ✅ Configurer le commit manuel des offsets avec AckMode.MANUAL
  • ✅ Utiliser une clé de partitionnement cohérente pour préserver l'ordre des événements
  • ✅ Implémenter des retries avec backoff exponentiel via DefaultErrorHandler
  • ✅ Router les messages en échec vers une Dead Letter Queue
  • ✅ Garantir l'idempotence côté consommateur avec tracking des eventIds
  • ✅ Considérer le pattern Outbox pour la cohérence base/Kafka
  • ✅ Exposer des métriques via Micrometer pour le monitoring
  • ✅ Définir MAX_POLL_INTERVAL_MS selon le temps de traitement maximum

Tags

#spring kafka
#event-driven
#kafka consumer
#microservices
#resilience

Partager

Articles similaires