Spring Kafka: подієва архітектура з відмовостійкими споживачами

Повний посібник зі Spring Kafka для подієвих архітектур. Конфігурація, відмовостійкі споживачі, політики retry, dead letter queue та продакшн-шаблони для розподілених застосунків.

Подієва архітектура зі Spring Kafka і відмовостійкими споживачами

Apache Kafka став де-факто стандартом для подієвих архітектур великого масштабу. Spring Kafka спрощує його інтеграцію в застосунки Spring Boot та водночас надає ключові механізми відмовостійкості для робочих середовищ. Цей посібник детально розглядає конфігурацію, шаблони споживання та стратегії обробки помилок.

Передумови

Посібник передбачає знайомство з базовими поняттями Kafka: топіки, партиції, consumer groups та офсети. Акцент зроблено на інтеграції зі Spring і шаблонах відмовостійкості.

Чому обирати подієву архітектуру?

Подієві архітектури роз'єднують компоненти системи через асинхронні події. На відміну від синхронних REST-викликів, продюсери випускають події, не очікуючи відповіді, і це дозволяє споживачам обробляти у власному темпі.

Такий підхід приносить кілька критичних переваг: незалежну горизонтальну масштабованість для кожного сервісу, підвищену відмовостійкість до тимчасових збоїв і повну простежуваність завдяки незмінному логу Kafka.

OrderEvent.javajava
public record OrderEvent(
    // Унікальний ідентифікатор події для ідемпотентності
    String eventId,
    // Тип події для маршрутизації
    String eventType,
    // Час створення
    Instant createdAt,
    // Бізнес-payload
    OrderPayload payload
) {
    // Factory-метод, що гарантує унікальність eventId
    public static OrderEvent created(OrderPayload payload) {
        return new OrderEvent(
            UUID.randomUUID().toString(),
            "ORDER_CREATED",
            Instant.now(),
            payload
        );
    }
}

public record OrderPayload(
    Long orderId,
    Long customerId,
    List<OrderItem> items,
    BigDecimal totalAmount
) {}

Структура події систематично містить унікальний ідентифікатор, тип і часову мітку. Ці метадані дозволяють фільтрувати на стороні споживача, виявляти дублікати та відстежувати у часі.

Базова конфігурація Spring Kafka

Інтеграція починається зі стартера spring-kafka і мінімальної YAML-конфігурації. Spring Boot автоматично налаштовує ключові біни: KafkaTemplate для продюсування і ConcurrentKafkaListenerContainerFactory для споживання.

yaml
# application.yml
spring:
  kafka:
    # Адреси брокерів Kafka (кластер)
    bootstrap-servers: localhost:9092

    # Конфігурація продюсера
    producer:
      # Серіалізація String-ключів
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # JSON-серіалізація значень
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # Очікування підтвердження від усіх реплік
      acks: all
      # Кількість повторних спроб у разі мережевої помилки
      retries: 3

    # Конфігурація споживача
    consumer:
      # Ідентифікатор consumer group
      group-id: order-service
      # Десеріалізація ключів
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # JSON-десеріалізація з цільовим типом
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # Початкова позиція, якщо немає зареєстрованого офсету
      auto-offset-reset: earliest
      # Вимкнути auto-commit для ручного контролю
      enable-auto-commit: false
      properties:
        # Довірений пакет для десеріалізації
        spring.json.trusted.packages: com.example.events

Вимкнення enable-auto-commit — обов'язкова продакшн-практика. Ручні коміти офсетів гарантують, що повідомлення позначається як оброблене лише після фактичного завершення обробки.

Створення продюсера Kafka

KafkaTemplate інкапсулює логіку відправлення в Kafka. Пряме впровадження дає змогу одразу використовувати його у бізнес-сервісах.

OrderEventPublisher.javajava
@Service
@Slf4j
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    // Цільовий топік (винесено в конфігурацію)
    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
            OrderPayload payload) {

        // Створення події з метаданими
        OrderEvent event = OrderEvent.created(payload);

        // Використання orderId як ключа партиції
        // Гарантує порядок подій для одного й того ж замовлення
        String partitionKey = payload.orderId().toString();

        log.info("Publishing ORDER_CREATED event: {} to topic: {}",
            event.eventId(), ordersTopic);

        // Асинхронне надсилання з callback
        return kafkaTemplate.send(ordersTopic, partitionKey, event)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    // Успіх: логування метаданих надсилання
                    RecordMetadata metadata = result.getRecordMetadata();
                    log.info("Event sent successfully: topic={}, partition={}, offset={}",
                        metadata.topic(), metadata.partition(), metadata.offset());
                } else {
                    // Помилка: логування для розслідування
                    log.error("Failed to send event: {}", event.eventId(), ex);
                }
            });
    }
}

Використання ключа партиції на основі бізнес-ідентифікатора гарантує, що всі події однієї сутності потраплять до однієї партиції, зберігаючи їх хронологічний порядок.

Ключ партиції

Нульовий ключ розподіляє повідомлення між партиціями за принципом round-robin. Це максимізує паралелізм, але втрачає гарантії порядку. Вибір ключа залежить від бізнес-вимог.

Базовий споживач із @KafkaListener

Анотація @KafkaListener перетворює метод на споживача Kafka. Spring автоматично керує циклом polling, десеріалізацією та комітами офсетів.

OrderEventConsumer.javajava
@Service
@Slf4j
public class OrderEventConsumer {

    private final OrderProcessingService processingService;

    public OrderEventConsumer(OrderProcessingService processingService) {
        this.processingService = processingService;
    }

    @KafkaListener(
        // Топік(и) для прослуховування
        topics = "${app.kafka.topics.orders}",
        // Consumer group
        groupId = "${spring.kafka.consumer.group-id}",
        // Кастомна factory для розширеної конфігурації
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            // Автоматично десеріалізований payload
            OrderEvent event,
            // Впроваджені метадані Kafka
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            // Acknowledgment для ручного коміту
            Acknowledgment acknowledgment) {

        log.info("Received event: type={}, partition={}, offset={}",
            event.eventType(), partition, offset);

        try {
            // Бізнес-обробка
            processingService.process(event);

            // Коміт офсету лише після успішної обробки
            acknowledgment.acknowledge();

            log.info("Event processed successfully: {}", event.eventId());

        } catch (Exception ex) {
            // Відсутність acknowledge() призводить до повторної обробки
            log.error("Failed to process event: {}", event.eventId(), ex);
            throw ex;
        }
    }
}

Впровадження Acknowledgment дає змогу явно контролювати коміт. Без виклику acknowledge() офсет залишається не закомічений, і повідомлення буде доставлене знову при наступному polling.

Розширена конфігурація ConsumerFactory

Стандартна конфігурація придатна для розробки, але вимагає коригувань для продакшену. Кастомна factory надає тонкий контроль над поведінкою споживача.

KafkaConsumerConfig.javajava
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        // Налаштування з'єднання
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // Десеріалізація
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);

        // Керування офсетами
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // Продуктивність: записи на один poll
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        // Тайм-аут сесії для виявлення збоїв
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        // Інтервал heartbeat (1/3 від тайм-ауту сесії)
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

        // Максимальний час обробки до rebalance
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

        // Конфігурація JSON-десеріалізатора
        JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
        deserializer.addTrustedPackages("com.example.events");
        deserializer.setUseTypeMapperForKey(false);

        return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        // Режим ручного acknowledgment
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Кількість потоків споживача
        factory.setConcurrency(3);

        // Пакетна обробка вимкнена (одне повідомлення за раз)
        factory.setBatchListener(false);

        return factory;
    }
}

Параметр MAX_POLL_INTERVAL_MS_CONFIG визначає максимальну затримку між викликами poll(). Її перевищення спричиняє виключення споживача з групи та rebalance. Це значення має відображати максимальний очікуваний час обробки.

Готовий до співбесід з Spring Boot?

Практикуйся з нашими інтерактивними симуляторами, flashcards та технічними тестами.

Стратегії повторних спроб з RetryTemplate

Тимчасові помилки (короткочасна недоступність сервісу, мережеві тайм-аути) потребують автоматичних повторних спроб. Spring Kafka інтегрується з RetryTemplate для впровадження складних retry-політик.

KafkaRetryConfig.javajava
@Configuration
public class KafkaRetryConfig {

    @Bean
    public RetryTemplate kafkaRetryTemplate() {
        RetryTemplate template = new RetryTemplate();

        // Політика retry: максимум 3 спроби
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        template.setRetryPolicy(retryPolicy);

        // Експоненційне backoff: 1с, 2с, 4с між спробами
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        template.setBackOffPolicy(backOffPolicy);

        return template;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            retryableKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                RetryTemplate kafkaRetryTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Конфігурація retry з recovery-callback
        factory.setCommonErrorHandler(
            new DefaultErrorHandler(
                // Recovery: дія після вичерпання спроб
                (record, exception) -> {
                    log.error("All retries exhausted for record: key={}, value={}",
                        record.key(), record.value(), exception);
                },
                // Експоненційне backoff: початкове 1с, макс 30с, 3 спроби
                new ExponentialBackOff(1000L, 2.0)
            )
        );

        return factory;
    }
}

DefaultErrorHandler замінив старий SeekToCurrentErrorHandler починаючи зі Spring Kafka 2.8. Він пропонує чіткіший API і розширені опції конфігурації.

Реалізація Dead Letter Queue

Після вичерпання спроб невдалі повідомлення слід направляти у Dead Letter Queue (DLQ) для подальшого аналізу. Цей підхід запобігає втраті даних і водночас розблоковує споживача.

DeadLetterConfig.javajava
@Configuration
public class DeadLetterConfig {

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // Стратегія найменування DLT-топіка: original-topic.DLT
        return new DeadLetterPublishingRecoverer(
            kafkaTemplate,
            (record, exception) -> {
                // DLT-топік на основі вихідного топіка
                String dltTopic = record.topic() + ".DLT";

                log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
                    dltTopic, record.key(), exception.getMessage());

                return new TopicPartition(dltTopic, record.partition());
            }
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            dltKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // ErrorHandler з DLT-recovery
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            deadLetterPublishingRecoverer,
            new FixedBackOff(1000L, 3)
        );

        // Винятки, що не підлягають повтору (прямо в DLT)
        errorHandler.addNotRetryableExceptions(
            ValidationException.class,
            JsonParseException.class,
            NullPointerException.class
        );

        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }
}

Розрізнення винятків, які можна повторювати, і тих, що не підлягають повтору, оптимізує поведінку. ValidationException сигналізує про погано сформовані дані, які повторні спроби не виправлять, тож пряма маршрутизація до DLT є виправданою.

DLT-споживач для ручного перепрацювання

Виділений споживач спостерігає за DLT і дозволяє повторно обробити повідомлення після усунення першопричини проблеми.

DeadLetterConsumer.javajava
@Service
@Slf4j
public class DeadLetterConsumer {

    private final AlertingService alertingService;
    private final FailedEventRepository failedEventRepository;

    public DeadLetterConsumer(AlertingService alertingService,
                              FailedEventRepository failedEventRepository) {
        this.alertingService = alertingService;
        this.failedEventRepository = failedEventRepository;
    }

    @KafkaListener(
        topics = "${app.kafka.topics.orders}.DLT",
        groupId = "order-service-dlt"
    )
    public void handleDeadLetter(
            ConsumerRecord<String, OrderEvent> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
            @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
            @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
            Acknowledgment acknowledgment) {

        log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
            originalTopic, originalPartition, originalOffset, exceptionMessage);

        // Збереження для аналізу і подальшої повторної обробки
        FailedEvent failedEvent = FailedEvent.builder()
            .eventId(record.value().eventId())
            .eventType(record.value().eventType())
            .payload(serializePayload(record.value()))
            .originalTopic(originalTopic)
            .originalPartition(originalPartition)
            .originalOffset(originalOffset)
            .exceptionMessage(exceptionMessage)
            .stacktrace(stacktrace)
            .status(FailedEventStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        failedEventRepository.save(failedEvent);

        // Сповіщення для людського втручання
        alertingService.notifyDeadLetter(failedEvent);

        acknowledgment.acknowledge();
    }
}
Заголовки DLT

Spring Kafka автоматично збагачує DLT-повідомлення заголовками з метаданими про збій: виняток, stacktrace, оригінальний топік, партиція та офсет. Ця інформація полегшує діагностику.

Управління ідемпотентністю на стороні споживача

Kafka гарантує доставку "at least once": повідомлення може бути доставлене кілька разів, якщо аварія сталася після обробки, але до коміту. Ідемпотентність на стороні споживача запобігає побічним ефектам повторної обробки.

IdempotentOrderProcessor.javajava
@Service
@Slf4j
public class IdempotentOrderProcessor {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderService orderService;

    public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
                                    OrderService orderService) {
        this.processedEventRepository = processedEventRepository;
        this.orderService = orderService;
    }

    @Transactional
    public void processIdempotent(OrderEvent event) {
        String eventId = event.eventId();

        // Перевірка: подію вже оброблено?
        if (processedEventRepository.existsByEventId(eventId)) {
            log.info("Event already processed, skipping: {}", eventId);
            return;
        }

        // Бізнес-обробка
        switch (event.eventType()) {
            case "ORDER_CREATED" -> orderService.createOrder(event.payload());
            case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
            case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
            default -> log.warn("Unknown event type: {}", event.eventType());
        }

        // Запис обробки в тій самій транзакції
        ProcessedEvent processed = ProcessedEvent.builder()
            .eventId(eventId)
            .eventType(event.eventType())
            .processedAt(Instant.now())
            .build();

        processedEventRepository.save(processed);

        log.info("Event processed and recorded: {}", eventId);
    }
}
ProcessedEventRepository.javajava
@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {

    // Індекс по eventId для оптимальної продуктивності
    boolean existsByEventId(String eventId);

    // Очищення старих записів
    @Modifying
    @Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
    int deleteOlderThan(@Param("before") Instant before);
}

Запис обробки в тій самій транзакції, що й бізнес-логіка, гарантує атомарність. Аварія після обробки, але до коміту, відтворить обидві операції узгоджено.

Шаблон Transactional Outbox

Шаблон Outbox розв'язує проблеми узгодженості між базою даних і Kafka. Замість того щоб публікувати безпосередньо в Kafka, події спершу зберігаються в таблиці "outbox", а потім ретранслюються виділеним процесом.

OutboxEvent.javajava
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String eventId;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String eventType;

    @Column(columnDefinition = "TEXT", nullable = false)
    private String payload;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private OutboxStatus status;

    @Column(nullable = false)
    private Instant createdAt;

    private Instant publishedAt;
}

public enum OutboxStatus {
    PENDING, PUBLISHED, FAILED
}
OrderService.javajava
@Service
@Transactional
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    public OrderService(OrderRepository orderRepository,
                       OutboxRepository outboxRepository,
                       ObjectMapper objectMapper) {
        this.orderRepository = orderRepository;
        this.outboxRepository = outboxRepository;
        this.objectMapper = objectMapper;
    }

    public Order createOrder(CreateOrderRequest request) {
        // Створення замовлення
        Order order = Order.builder()
            .customerId(request.customerId())
            .items(request.items())
            .totalAmount(calculateTotal(request.items()))
            .status(OrderStatus.CREATED)
            .createdAt(Instant.now())
            .build();

        Order savedOrder = orderRepository.save(order);

        // Створення outbox-події в тій самій транзакції
        OrderEvent event = OrderEvent.created(toPayload(savedOrder));

        OutboxEvent outboxEvent = OutboxEvent.builder()
            .eventId(event.eventId())
            .aggregateType("Order")
            .aggregateId(savedOrder.getId().toString())
            .eventType(event.eventType())
            .payload(serialize(event))
            .status(OutboxStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        outboxRepository.save(outboxEvent);

        return savedOrder;
    }

    private String serialize(OrderEvent event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Failed to serialize event", e);
        }
    }
}

Outbox-релеєр публікує події в Kafka асинхронно:

OutboxRelayScheduler.javajava
@Component
@Slf4j
public class OutboxRelayScheduler {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OutboxRelayScheduler(OutboxRepository outboxRepository,
                               KafkaTemplate<String, String> kafkaTemplate) {
        this.outboxRepository = outboxRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void relayPendingEvents() {
        // Отримання очікуваних подій із блокуванням
        List<OutboxEvent> pendingEvents = outboxRepository
            .findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);

        for (OutboxEvent event : pendingEvents) {
            try {
                // Публікація в Kafka
                kafkaTemplate.send(
                    ordersTopic,
                    event.getAggregateId(),
                    event.getPayload()
                ).get(5, TimeUnit.SECONDS);

                // Оновлення статусу
                event.setStatus(OutboxStatus.PUBLISHED);
                event.setPublishedAt(Instant.now());
                outboxRepository.save(event);

                log.debug("Outbox event published: {}", event.getEventId());

            } catch (Exception ex) {
                log.error("Failed to publish outbox event: {}",
                    event.getEventId(), ex);
                event.setStatus(OutboxStatus.FAILED);
                outboxRepository.save(event);
            }
        }
    }
}

Цей шаблон гарантує, що подія публікується тільки за умови успіху бізнес-транзакції, усуваючи невідповідності між базою даних і Kafka.

Починай практикувати!

Перевір свої знання з нашими симуляторами співбесід та технічними тестами.

Моніторинг та спостережуваність

Контроль над Kafka-системою потребує метрик щодо затримки споживання, lag і помилок. Spring Boot Actuator надає ці метрики через Micrometer.

KafkaMetricsConfig.javajava
@Configuration
public class KafkaMetricsConfig {

    @Bean
    public MeterBinder kafkaConsumerMetrics(
            ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        return registry -> {
            // Кастомна метрика: оброблені події
            Counter.builder("kafka.consumer.events.processed")
                .description("Number of events successfully processed")
                .tag("topic", "orders")
                .register(registry);

            // Кастомна метрика: невдалі події
            Counter.builder("kafka.consumer.events.failed")
                .description("Number of events that failed processing")
                .tag("topic", "orders")
                .register(registry);
        };
    }
}
InstrumentedOrderConsumer.javajava
@Service
@Slf4j
public class InstrumentedOrderConsumer {

    private final OrderProcessingService processingService;
    private final MeterRegistry meterRegistry;
    private final Counter processedCounter;
    private final Counter failedCounter;
    private final Timer processingTimer;

    public InstrumentedOrderConsumer(OrderProcessingService processingService,
                                     MeterRegistry meterRegistry) {
        this.processingService = processingService;
        this.meterRegistry = meterRegistry;

        // Ініціалізація лічильників
        this.processedCounter = Counter.builder("kafka.consumer.events.processed")
            .tag("topic", "orders")
            .register(meterRegistry);

        this.failedCounter = Counter.builder("kafka.consumer.events.failed")
            .tag("topic", "orders")
            .register(meterRegistry);

        // Таймер для вимірювання затримки обробки
        this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
            .tag("topic", "orders")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(meterRegistry);
    }

    @KafkaListener(topics = "${app.kafka.topics.orders}")
    public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {

        Timer.Sample sample = Timer.start(meterRegistry);

        try {
            processingService.process(event);
            acknowledgment.acknowledge();

            processedCounter.increment();
            sample.stop(processingTimer);

        } catch (Exception ex) {
            failedCounter.increment();
            sample.stop(processingTimer);
            throw ex;
        }
    }
}

Ці метрики дозволяють налаштувати сповіщення щодо lag споживача, рівня помилок і затримки обробки, що є необхідним для проактивного виявлення проблем.

Висновок

Spring Kafka надає надійну інтеграцію для побудови відмовостійких подієвих архітектур. Володіння механізмами retry, dead letter queue та ідемпотентністю формує основу застосунків, готових до продакшену.

Чек-лист подієвої архітектури зі Spring Kafka:

  • ✅ Налаштувати ручні коміти офсетів із AckMode.MANUAL
  • ✅ Використовувати узгоджений ключ партиції для збереження порядку подій
  • ✅ Реалізувати retry з експоненційним backoff через DefaultErrorHandler
  • ✅ Маршрутизувати невдалі повідомлення до Dead Letter Queue
  • ✅ Гарантувати ідемпотентність на стороні споживача через відстеження eventId
  • ✅ Розглянути шаблон Outbox для узгодженості бази даних і Kafka
  • ✅ Експонувати метрики через Micrometer для моніторингу
  • ✅ Налаштувати MAX_POLL_INTERVAL_MS відповідно до максимального часу обробки

Теги

#spring kafka
#event-driven
#kafka consumer
#microservices
#resilience

Поділитися

Пов'язані статті