Spring Kafka: architektura event-driven z odpornymi konsumentami

Kompletny przewodnik po Spring Kafka dla architektur event-driven. Konfiguracja, odporni konsumenci, polityki retry, dead letter queue i wzorce produkcyjne dla aplikacji rozproszonych.

Architektura event-driven ze Spring Kafka i odpornymi konsumentami

Apache Kafka stał się de facto standardem dla architektur sterowanych zdarzeniami na dużą skalę. Spring Kafka upraszcza jego integrację z aplikacjami Spring Boot, zapewniając jednocześnie kluczowe mechanizmy odporności w środowiskach produkcyjnych. Niniejszy przewodnik dogłębnie omawia konfigurację, wzorce konsumpcji i strategie obsługi błędów.

Wymagania wstępne

Przewodnik zakłada znajomość podstawowych pojęć Kafki: topiki, partycje, consumer groups oraz offsety. Nacisk położono na integrację ze Springiem i wzorce odporności.

Dlaczego wybrać architekturę event-driven?

Architektury event-driven rozdzielają komponenty systemu poprzez asynchroniczne zdarzenia. W przeciwieństwie do synchronicznych wywołań REST, producenci emitują zdarzenia bez oczekiwania na odpowiedź, pozwalając konsumentom przetwarzać je we własnym tempie.

Takie podejście przynosi kilka kluczowych korzyści: niezależną skalowalność horyzontalną poszczególnych usług, większą odporność na chwilowe awarie oraz pełną śledzialność dzięki niemodyfikowalnemu logowi Kafki.

OrderEvent.javajava
public record OrderEvent(
    // Unikalny identyfikator zdarzenia dla idempotentności
    String eventId,
    // Typ zdarzenia do routingu
    String eventType,
    // Znacznik czasu utworzenia
    Instant createdAt,
    // Payload biznesowy
    OrderPayload payload
) {
    // Metoda fabrykująca gwarantująca unikalność eventId
    public static OrderEvent created(OrderPayload payload) {
        return new OrderEvent(
            UUID.randomUUID().toString(),
            "ORDER_CREATED",
            Instant.now(),
            payload
        );
    }
}

public record OrderPayload(
    Long orderId,
    Long customerId,
    List<OrderItem> items,
    BigDecimal totalAmount
) {}

Struktura zdarzenia systematycznie zawiera unikalny identyfikator, typ oraz znacznik czasu. Te metadane umożliwiają filtrowanie po stronie konsumenta, wykrywanie duplikatów i śledzenie czasowe.

Podstawowa konfiguracja Spring Kafka

Integracja zaczyna się od startera spring-kafka i minimalnej konfiguracji YAML. Spring Boot automatycznie konfiguruje kluczowe beany: KafkaTemplate do produkcji oraz ConcurrentKafkaListenerContainerFactory do konsumpcji.

yaml
# application.yml
spring:
  kafka:
    # Adresy brokerów Kafka (klaster)
    bootstrap-servers: localhost:9092

    # Konfiguracja producenta
    producer:
      # Serializacja kluczy String
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Serializacja wartości JSON
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # Oczekiwanie na potwierdzenie wszystkich replik
      acks: all
      # Liczba ponownych prób przy błędzie sieci
      retries: 3

    # Konfiguracja konsumenta
    consumer:
      # Identyfikator consumer group
      group-id: order-service
      # Deserializacja kluczy
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Deserializacja JSON z typem docelowym
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # Pozycja startowa, gdy brak zarejestrowanego offsetu
      auto-offset-reset: earliest
      # Wyłączenie auto-commit dla ręcznej kontroli
      enable-auto-commit: false
      properties:
        # Zaufany pakiet do deserializacji
        spring.json.trusted.packages: com.example.events

Wyłączenie enable-auto-commit to kluczowa praktyka produkcyjna. Ręczne commity offsetów gwarantują, że wiadomość zostaje oznaczona jako przetworzona dopiero po faktycznym zakończeniu przetwarzania.

Tworzenie producenta Kafki

KafkaTemplate enkapsuluje logikę wysyłania do Kafki. Bezpośrednie wstrzyknięcie umożliwia natychmiastowe użycie wewnątrz serwisów biznesowych.

OrderEventPublisher.javajava
@Service
@Slf4j
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    // Topik docelowy (wyłuskany z konfiguracji)
    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
            OrderPayload payload) {

        // Tworzenie zdarzenia z metadanymi
        OrderEvent event = OrderEvent.created(payload);

        // Użycie orderId jako klucza partycji
        // Gwarantuje kolejność zdarzeń dla tego samego zamówienia
        String partitionKey = payload.orderId().toString();

        log.info("Publishing ORDER_CREATED event: {} to topic: {}",
            event.eventId(), ordersTopic);

        // Asynchroniczna wysyłka z callbackiem
        return kafkaTemplate.send(ordersTopic, partitionKey, event)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    // Sukces: log metadanych wysyłki
                    RecordMetadata metadata = result.getRecordMetadata();
                    log.info("Event sent successfully: topic={}, partition={}, offset={}",
                        metadata.topic(), metadata.partition(), metadata.offset());
                } else {
                    // Błąd: log błędu do dalszej analizy
                    log.error("Failed to send event: {}", event.eventId(), ex);
                }
            });
    }
}

Użycie klucza partycji opartego na identyfikatorze biznesowym zapewnia, że wszystkie zdarzenia tej samej encji trafią do tej samej partycji, dzięki czemu zachowana zostaje ich kolejność chronologiczna.

Klucz partycji

Pusty klucz rozprowadza wiadomości metodą round-robin między partycjami. Maksymalizuje to równoległość, ale traci gwarancje porządku. Wybór klucza zależy od wymagań biznesowych.

Podstawowy konsument z @KafkaListener

Adnotacja @KafkaListener przekształca metodę w konsumenta Kafki. Spring automatycznie obsługuje pętlę pollingu, deserializację oraz commity offsetów.

OrderEventConsumer.javajava
@Service
@Slf4j
public class OrderEventConsumer {

    private final OrderProcessingService processingService;

    public OrderEventConsumer(OrderProcessingService processingService) {
        this.processingService = processingService;
    }

    @KafkaListener(
        // Topik(i) do nasłuchu
        topics = "${app.kafka.topics.orders}",
        // Consumer group
        groupId = "${spring.kafka.consumer.group-id}",
        // Niestandardowa fabryka dla zaawansowanej konfiguracji
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            // Payload zdeserializowany automatycznie
            OrderEvent event,
            // Wstrzyknięte metadane Kafki
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            // Acknowledgment do ręcznego commita
            Acknowledgment acknowledgment) {

        log.info("Received event: type={}, partition={}, offset={}",
            event.eventType(), partition, offset);

        try {
            // Przetwarzanie biznesowe
            processingService.process(event);

            // Commit offsetu wyłącznie po pomyślnym przetworzeniu
            acknowledgment.acknowledge();

            log.info("Event processed successfully: {}", event.eventId());

        } catch (Exception ex) {
            // Brak acknowledge() powoduje ponowne przetworzenie
            log.error("Failed to process event: {}", event.eventId(), ex);
            throw ex;
        }
    }
}

Wstrzyknięcie Acknowledgment pozwala na jawną kontrolę commita. Bez wywołania acknowledge() offset pozostaje niezatwierdzony, a wiadomość zostanie ponownie dostarczona przy kolejnym pollingu.

Zaawansowana konfiguracja ConsumerFactory

Domyślna konfiguracja sprawdza się w środowisku deweloperskim, ale produkcja wymaga dostosowań. Niestandardowa fabryka oferuje precyzyjną kontrolę nad zachowaniem konsumenta.

KafkaConsumerConfig.javajava
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        // Konfiguracja połączenia
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // Deserializacja
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);

        // Zarządzanie offsetami
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // Wydajność: rekordy na pojedynczy poll
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        // Timeout sesji do wykrywania awarii
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        // Odstęp heartbeat (1/3 timeoutu sesji)
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

        // Maksymalny czas przetwarzania przed rebalansem
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

        // Konfiguracja deserializatora JSON
        JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
        deserializer.addTrustedPackages("com.example.events");
        deserializer.setUseTypeMapperForKey(false);

        return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        // Ręczny tryb acknowledgmentu
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Liczba wątków konsumenta
        factory.setConcurrency(3);

        // Przetwarzanie wsadowe wyłączone (jedna wiadomość naraz)
        factory.setBatchListener(false);

        return factory;
    }
}

Parametr MAX_POLL_INTERVAL_MS_CONFIG definiuje maksymalne opóźnienie między wywołaniami poll(). Jego przekroczenie powoduje wykluczenie konsumenta z grupy i rebalans. Wartość powinna odzwierciedlać maksymalny oczekiwany czas przetwarzania.

Gotowy na rozmowy o Spring Boot?

Ćwicz z naszymi interaktywnymi symulatorami, flashcards i testami technicznymi.

Strategie ponawiania z RetryTemplate

Błędy przejściowe (chwilowa niedostępność usługi, timeouty sieciowe) wymagają automatycznych ponownych prób. Spring Kafka integruje się z RetryTemplate w celu wdrożenia zaawansowanych polityk retry.

KafkaRetryConfig.javajava
@Configuration
public class KafkaRetryConfig {

    @Bean
    public RetryTemplate kafkaRetryTemplate() {
        RetryTemplate template = new RetryTemplate();

        // Polityka retry: maks. 3 próby
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        template.setRetryPolicy(retryPolicy);

        // Wykładniczy backoff: 1s, 2s, 4s między próbami
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        template.setBackOffPolicy(backOffPolicy);

        return template;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            retryableKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                RetryTemplate kafkaRetryTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Konfiguracja retry z callbackiem recovery
        factory.setCommonErrorHandler(
            new DefaultErrorHandler(
                // Recovery: akcja po wyczerpaniu prób
                (record, exception) -> {
                    log.error("All retries exhausted for record: key={}, value={}",
                        record.key(), record.value(), exception);
                },
                // Wykładniczy backoff: 1s start, maks. 30s, 3 próby
                new ExponentialBackOff(1000L, 2.0)
            )
        );

        return factory;
    }
}

DefaultErrorHandler zastępuje starszy SeekToCurrentErrorHandler od Spring Kafka 2.8. Oferuje czytelniejsze API i rozszerzone opcje konfiguracyjne.

Implementacja Dead Letter Queue

Po wyczerpaniu prób nieudane wiadomości należy kierować do Dead Letter Queue (DLQ) w celu późniejszej analizy. Takie podejście zapobiega utracie danych, jednocześnie odblokowując konsumenta.

DeadLetterConfig.javajava
@Configuration
public class DeadLetterConfig {

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // Strategia nazewnictwa topiku DLT: original-topic.DLT
        return new DeadLetterPublishingRecoverer(
            kafkaTemplate,
            (record, exception) -> {
                // Topik DLT bazujący na topiku źródłowym
                String dltTopic = record.topic() + ".DLT";

                log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
                    dltTopic, record.key(), exception.getMessage());

                return new TopicPartition(dltTopic, record.partition());
            }
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            dltKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // ErrorHandler z recovery DLT
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            deadLetterPublishingRecoverer,
            new FixedBackOff(1000L, 3)
        );

        // Wyjątki nieretryowalne (bezpośrednia wysyłka do DLT)
        errorHandler.addNotRetryableExceptions(
            ValidationException.class,
            JsonParseException.class,
            NullPointerException.class
        );

        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }
}

Rozróżnianie wyjątków retryowalnych i nieretryowalnych optymalizuje zachowanie. ValidationException wskazuje na nieprawidłowe dane, których ponowne próby nie naprawią, co uzasadnia bezpośrednie kierowanie do DLT.

Konsument DLT do ręcznego ponownego przetwarzania

Dedykowany konsument monitoruje DLT i umożliwia ponowne przetworzenie wiadomości po naprawieniu pierwotnego problemu.

DeadLetterConsumer.javajava
@Service
@Slf4j
public class DeadLetterConsumer {

    private final AlertingService alertingService;
    private final FailedEventRepository failedEventRepository;

    public DeadLetterConsumer(AlertingService alertingService,
                              FailedEventRepository failedEventRepository) {
        this.alertingService = alertingService;
        this.failedEventRepository = failedEventRepository;
    }

    @KafkaListener(
        topics = "${app.kafka.topics.orders}.DLT",
        groupId = "order-service-dlt"
    )
    public void handleDeadLetter(
            ConsumerRecord<String, OrderEvent> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
            @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
            @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
            Acknowledgment acknowledgment) {

        log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
            originalTopic, originalPartition, originalOffset, exceptionMessage);

        // Persystencja do analizy i późniejszego reprocessingu
        FailedEvent failedEvent = FailedEvent.builder()
            .eventId(record.value().eventId())
            .eventType(record.value().eventType())
            .payload(serializePayload(record.value()))
            .originalTopic(originalTopic)
            .originalPartition(originalPartition)
            .originalOffset(originalOffset)
            .exceptionMessage(exceptionMessage)
            .stacktrace(stacktrace)
            .status(FailedEventStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        failedEventRepository.save(failedEvent);

        // Alert do interwencji człowieka
        alertingService.notifyDeadLetter(failedEvent);

        acknowledgment.acknowledge();
    }
}
Nagłówki DLT

Spring Kafka automatycznie wzbogaca wiadomości DLT o nagłówki zawierające metadane awarii: wyjątek, stacktrace, oryginalny topik, partycję i offset. Te informacje ułatwiają diagnostykę.

Obsługa idempotentności po stronie konsumenta

Kafka gwarantuje dostawę "at least once": wiadomość może zostać dostarczona wielokrotnie, jeśli wystąpi awaria po przetworzeniu, ale przed commitem. Idempotentność po stronie konsumenta zapobiega skutkom ubocznym ponownego przetwarzania.

IdempotentOrderProcessor.javajava
@Service
@Slf4j
public class IdempotentOrderProcessor {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderService orderService;

    public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
                                    OrderService orderService) {
        this.processedEventRepository = processedEventRepository;
        this.orderService = orderService;
    }

    @Transactional
    public void processIdempotent(OrderEvent event) {
        String eventId = event.eventId();

        // Weryfikacja: zdarzenie już przetworzone?
        if (processedEventRepository.existsByEventId(eventId)) {
            log.info("Event already processed, skipping: {}", eventId);
            return;
        }

        // Przetwarzanie biznesowe
        switch (event.eventType()) {
            case "ORDER_CREATED" -> orderService.createOrder(event.payload());
            case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
            case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
            default -> log.warn("Unknown event type: {}", event.eventType());
        }

        // Zapis przetwarzania w tej samej transakcji
        ProcessedEvent processed = ProcessedEvent.builder()
            .eventId(eventId)
            .eventType(event.eventType())
            .processedAt(Instant.now())
            .build();

        processedEventRepository.save(processed);

        log.info("Event processed and recorded: {}", eventId);
    }
}
ProcessedEventRepository.javajava
@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {

    // Indeks na eventId dla optymalnej wydajności
    boolean existsByEventId(String eventId);

    // Czyszczenie starych rekordów
    @Modifying
    @Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
    int deleteOlderThan(@Param("before") Instant before);
}

Zapis przetwarzania w tej samej transakcji co logika biznesowa gwarantuje atomowość. Awaria po przetworzeniu, ale przed commitem, spójnie odtworzy obie operacje.

Wzorzec Transactional Outbox

Wzorzec Outbox rozwiązuje problemy spójności między bazą danych a Kafką. Zamiast publikować bezpośrednio do Kafki, zdarzenia są najpierw zapisywane w tabeli "outbox", a następnie przekazywane przez dedykowany proces.

OutboxEvent.javajava
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String eventId;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String eventType;

    @Column(columnDefinition = "TEXT", nullable = false)
    private String payload;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private OutboxStatus status;

    @Column(nullable = false)
    private Instant createdAt;

    private Instant publishedAt;
}

public enum OutboxStatus {
    PENDING, PUBLISHED, FAILED
}
OrderService.javajava
@Service
@Transactional
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    public OrderService(OrderRepository orderRepository,
                       OutboxRepository outboxRepository,
                       ObjectMapper objectMapper) {
        this.orderRepository = orderRepository;
        this.outboxRepository = outboxRepository;
        this.objectMapper = objectMapper;
    }

    public Order createOrder(CreateOrderRequest request) {
        // Utworzenie zamówienia
        Order order = Order.builder()
            .customerId(request.customerId())
            .items(request.items())
            .totalAmount(calculateTotal(request.items()))
            .status(OrderStatus.CREATED)
            .createdAt(Instant.now())
            .build();

        Order savedOrder = orderRepository.save(order);

        // Utworzenie zdarzenia outbox w tej samej transakcji
        OrderEvent event = OrderEvent.created(toPayload(savedOrder));

        OutboxEvent outboxEvent = OutboxEvent.builder()
            .eventId(event.eventId())
            .aggregateType("Order")
            .aggregateId(savedOrder.getId().toString())
            .eventType(event.eventType())
            .payload(serialize(event))
            .status(OutboxStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        outboxRepository.save(outboxEvent);

        return savedOrder;
    }

    private String serialize(OrderEvent event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Failed to serialize event", e);
        }
    }
}

Outbox relayer publikuje zdarzenia do Kafki asynchronicznie:

OutboxRelayScheduler.javajava
@Component
@Slf4j
public class OutboxRelayScheduler {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OutboxRelayScheduler(OutboxRepository outboxRepository,
                               KafkaTemplate<String, String> kafkaTemplate) {
        this.outboxRepository = outboxRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void relayPendingEvents() {
        // Pobranie oczekujących zdarzeń z lockiem
        List<OutboxEvent> pendingEvents = outboxRepository
            .findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);

        for (OutboxEvent event : pendingEvents) {
            try {
                // Publikacja do Kafki
                kafkaTemplate.send(
                    ordersTopic,
                    event.getAggregateId(),
                    event.getPayload()
                ).get(5, TimeUnit.SECONDS);

                // Aktualizacja statusu
                event.setStatus(OutboxStatus.PUBLISHED);
                event.setPublishedAt(Instant.now());
                outboxRepository.save(event);

                log.debug("Outbox event published: {}", event.getEventId());

            } catch (Exception ex) {
                log.error("Failed to publish outbox event: {}",
                    event.getEventId(), ex);
                event.setStatus(OutboxStatus.FAILED);
                outboxRepository.save(event);
            }
        }
    }
}

Ten wzorzec gwarantuje, że zdarzenie zostanie opublikowane wyłącznie wtedy, gdy transakcja biznesowa zakończyła się powodzeniem, eliminując niespójności między bazą danych a Kafką.

Zacznij ćwiczyć!

Sprawdź swoją wiedzę z naszymi symulatorami rozmów i testami technicznymi.

Monitoring i obserwowalność

Nadzór nad systemem Kafka wymaga metryk dotyczących opóźnienia konsumpcji, lagu i błędów. Spring Boot Actuator udostępnia te metryki przez Micrometer.

KafkaMetricsConfig.javajava
@Configuration
public class KafkaMetricsConfig {

    @Bean
    public MeterBinder kafkaConsumerMetrics(
            ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        return registry -> {
            // Niestandardowa metryka: przetworzone zdarzenia
            Counter.builder("kafka.consumer.events.processed")
                .description("Number of events successfully processed")
                .tag("topic", "orders")
                .register(registry);

            // Niestandardowa metryka: zdarzenia zakończone niepowodzeniem
            Counter.builder("kafka.consumer.events.failed")
                .description("Number of events that failed processing")
                .tag("topic", "orders")
                .register(registry);
        };
    }
}
InstrumentedOrderConsumer.javajava
@Service
@Slf4j
public class InstrumentedOrderConsumer {

    private final OrderProcessingService processingService;
    private final MeterRegistry meterRegistry;
    private final Counter processedCounter;
    private final Counter failedCounter;
    private final Timer processingTimer;

    public InstrumentedOrderConsumer(OrderProcessingService processingService,
                                     MeterRegistry meterRegistry) {
        this.processingService = processingService;
        this.meterRegistry = meterRegistry;

        // Inicjalizacja liczników
        this.processedCounter = Counter.builder("kafka.consumer.events.processed")
            .tag("topic", "orders")
            .register(meterRegistry);

        this.failedCounter = Counter.builder("kafka.consumer.events.failed")
            .tag("topic", "orders")
            .register(meterRegistry);

        // Timer mierzący opóźnienie przetwarzania
        this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
            .tag("topic", "orders")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(meterRegistry);
    }

    @KafkaListener(topics = "${app.kafka.topics.orders}")
    public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {

        Timer.Sample sample = Timer.start(meterRegistry);

        try {
            processingService.process(event);
            acknowledgment.acknowledge();

            processedCounter.increment();
            sample.stop(processingTimer);

        } catch (Exception ex) {
            failedCounter.increment();
            sample.stop(processingTimer);
            throw ex;
        }
    }
}

Te metryki pozwalają konfigurować alerty dotyczące lagu konsumenta, wskaźnika błędów i opóźnienia przetwarzania, kluczowe dla proaktywnego wykrywania problemów.

Podsumowanie

Spring Kafka zapewnia solidną integrację do budowy odpornych architektur event-driven. Opanowanie mechanizmów retry, dead letter queue oraz idempotentności stanowi fundament aplikacji gotowych do produkcji.

Lista kontrolna architektury event-driven ze Spring Kafka:

  • ✅ Skonfigurować ręczne commity offsetów z AckMode.MANUAL
  • ✅ Stosować spójny klucz partycji, aby zachować kolejność zdarzeń
  • ✅ Wdrożyć retry z wykładniczym backoffem za pomocą DefaultErrorHandler
  • ✅ Kierować nieudane wiadomości do Dead Letter Queue
  • ✅ Zapewnić idempotentność po stronie konsumenta przez śledzenie eventId
  • ✅ Rozważyć wzorzec Outbox dla spójności bazy danych i Kafki
  • ✅ Udostępnić metryki przez Micrometer do monitoringu
  • ✅ Dostosować MAX_POLL_INTERVAL_MS do maksymalnego czasu przetwarzania

Tagi

#spring kafka
#event-driven
#kafka consumer
#microservices
#resilience

Udostępnij

Powiązane artykuły