Spring Kafka: kiến trúc event-driven với consumer chịu lỗi

Hướng dẫn đầy đủ về Spring Kafka cho kiến trúc event-driven. Cấu hình, consumer chịu lỗi, chính sách retry, dead letter queue và các mẫu sản xuất cho ứng dụng phân tán.

Kiến trúc event-driven với Spring Kafka và consumer chịu lỗi

Apache Kafka đã trở thành tiêu chuẩn de facto cho các kiến trúc event-driven quy mô lớn. Spring Kafka đơn giản hóa việc tích hợp vào các ứng dụng Spring Boot, đồng thời cung cấp những cơ chế chịu lỗi thiết yếu cho môi trường sản xuất. Hướng dẫn này phân tích sâu cấu hình, các mẫu tiêu thụ và chiến lược xử lý lỗi.

Yêu cầu tiên quyết

Hướng dẫn giả định người đọc đã quen với các khái niệm cơ bản của Kafka: topic, partition, consumer group và offset. Trọng tâm đặt vào tích hợp với Spring và các mẫu chịu lỗi.

Vì sao chọn kiến trúc event-driven?

Các kiến trúc event-driven tách rời các thành phần hệ thống thông qua sự kiện bất đồng bộ. Khác với các lời gọi REST đồng bộ, producer phát sự kiện mà không cần chờ phản hồi, cho phép consumer xử lý theo nhịp riêng.

Cách tiếp cận này mang lại nhiều lợi ích then chốt: khả năng mở rộng theo chiều ngang độc lập cho từng dịch vụ, độ bền cao hơn trước các sự cố tạm thời và khả năng truy vết toàn diện nhờ log bất biến của Kafka.

OrderEvent.javajava
public record OrderEvent(
    // Định danh duy nhất của sự kiện cho tính bất biến (idempotence)
    String eventId,
    // Loại sự kiện dùng cho định tuyến
    String eventType,
    // Dấu thời gian khởi tạo
    Instant createdAt,
    // Payload nghiệp vụ
    OrderPayload payload
) {
    // Phương thức factory bảo đảm tính duy nhất của eventId
    public static OrderEvent created(OrderPayload payload) {
        return new OrderEvent(
            UUID.randomUUID().toString(),
            "ORDER_CREATED",
            Instant.now(),
            payload
        );
    }
}

public record OrderPayload(
    Long orderId,
    Long customerId,
    List<OrderItem> items,
    BigDecimal totalAmount
) {}

Cấu trúc sự kiện luôn bao gồm một định danh duy nhất, một loại và một dấu thời gian. Các metadata này hỗ trợ lọc ở phía consumer, phát hiện trùng lặp và truy vết theo thời gian.

Cấu hình cơ bản Spring Kafka

Việc tích hợp bắt đầu với starter spring-kafka và cấu hình YAML tối thiểu. Spring Boot tự động cấu hình các bean thiết yếu: KafkaTemplate để sản xuất và ConcurrentKafkaListenerContainerFactory để tiêu thụ.

yaml
# application.yml
spring:
  kafka:
    # Địa chỉ broker Kafka (cluster)
    bootstrap-servers: localhost:9092

    # Cấu hình producer
    producer:
      # Tuần tự hóa key kiểu String
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Tuần tự hóa value JSON
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # Chờ xác nhận từ tất cả các replica
      acks: all
      # Số lần thử lại khi lỗi mạng
      retries: 3

    # Cấu hình consumer
    consumer:
      # Định danh consumer group
      group-id: order-service
      # Giải tuần tự key
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Giải tuần tự JSON theo kiểu đích
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # Vị trí khởi đầu khi chưa có offset
      auto-offset-reset: earliest
      # Tắt auto-commit để kiểm soát thủ công
      enable-auto-commit: false
      properties:
        # Package tin cậy cho deserialization
        spring.json.trusted.packages: com.example.events

Tắt enable-auto-commit là một thực hành thiết yếu trong môi trường sản xuất. Commit offset thủ công bảo đảm một thông điệp chỉ được đánh dấu là đã xử lý sau khi việc xử lý thực sự hoàn tất.

Tạo producer Kafka

KafkaTemplate đóng gói logic gửi đến Kafka. Việc tiêm trực tiếp cho phép sử dụng ngay trong các dịch vụ nghiệp vụ.

OrderEventPublisher.javajava
@Service
@Slf4j
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    // Topic đích (đặt bên ngoài cấu hình)
    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
            OrderPayload payload) {

        // Tạo sự kiện kèm metadata
        OrderEvent event = OrderEvent.created(payload);

        // Sử dụng orderId làm key phân vùng
        // Bảo đảm thứ tự sự kiện cho cùng một đơn hàng
        String partitionKey = payload.orderId().toString();

        log.info("Publishing ORDER_CREATED event: {} to topic: {}",
            event.eventId(), ordersTopic);

        // Gửi bất đồng bộ kèm callback
        return kafkaTemplate.send(ordersTopic, partitionKey, event)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    // Thành công: ghi log metadata gửi
                    RecordMetadata metadata = result.getRecordMetadata();
                    log.info("Event sent successfully: topic={}, partition={}, offset={}",
                        metadata.topic(), metadata.partition(), metadata.offset());
                } else {
                    // Thất bại: ghi log lỗi để điều tra
                    log.error("Failed to send event: {}", event.eventId(), ex);
                }
            });
    }
}

Việc dùng key phân vùng dựa trên định danh nghiệp vụ bảo đảm mọi sự kiện của cùng một thực thể rơi vào cùng một partition, qua đó duy trì thứ tự thời gian.

Key phân vùng

Key null sẽ phân phối thông điệp theo round-robin giữa các partition. Cách đó tối đa hóa tính song song nhưng đánh đổi đảm bảo về thứ tự. Việc chọn key tùy thuộc yêu cầu nghiệp vụ.

Consumer cơ bản với @KafkaListener

Annotation @KafkaListener biến một phương thức thành consumer Kafka. Spring tự động xử lý vòng polling, deserialization và commit offset.

OrderEventConsumer.javajava
@Service
@Slf4j
public class OrderEventConsumer {

    private final OrderProcessingService processingService;

    public OrderEventConsumer(OrderProcessingService processingService) {
        this.processingService = processingService;
    }

    @KafkaListener(
        // Topic cần lắng nghe
        topics = "${app.kafka.topics.orders}",
        // Consumer group
        groupId = "${spring.kafka.consumer.group-id}",
        // Factory tùy biến cho cấu hình nâng cao
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            // Payload đã được giải tuần tự tự động
            OrderEvent event,
            // Metadata Kafka được tiêm vào
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            // Acknowledgment để commit thủ công
            Acknowledgment acknowledgment) {

        log.info("Received event: type={}, partition={}, offset={}",
            event.eventType(), partition, offset);

        try {
            // Xử lý nghiệp vụ
            processingService.process(event);

            // Chỉ commit offset sau khi xử lý thành công
            acknowledgment.acknowledge();

            log.info("Event processed successfully: {}", event.eventId());

        } catch (Exception ex) {
            // Việc thiếu acknowledge() sẽ kích hoạt xử lý lại
            log.error("Failed to process event: {}", event.eventId(), ex);
            throw ex;
        }
    }
}

Tiêm Acknowledgment cho phép kiểm soát commit một cách rõ ràng. Nếu không gọi acknowledge(), offset vẫn chưa được commit và thông điệp sẽ được phân phối lại ở lần poll kế tiếp.

Cấu hình ConsumerFactory nâng cao

Cấu hình mặc định phù hợp cho phát triển nhưng cần điều chỉnh khi triển khai thực tế. Một factory tùy biến mang lại sự kiểm soát chi tiết đối với hành vi của consumer.

KafkaConsumerConfig.javajava
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        // Cấu hình kết nối
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // Giải tuần tự
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);

        // Quản lý offset
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // Hiệu năng: số bản ghi mỗi poll
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        // Timeout phiên để phát hiện lỗi
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        // Khoảng heartbeat (1/3 timeout phiên)
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

        // Thời gian xử lý tối đa trước khi rebalance
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

        // Cấu hình deserializer JSON
        JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
        deserializer.addTrustedPackages("com.example.events");
        deserializer.setUseTypeMapperForKey(false);

        return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        // Chế độ acknowledgment thủ công
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Số luồng consumer
        factory.setConcurrency(3);

        // Tắt xử lý theo lô (mỗi lần một thông điệp)
        factory.setBatchListener(false);

        return factory;
    }
}

Tham số MAX_POLL_INTERVAL_MS_CONFIG xác định độ trễ tối đa giữa các lần gọi poll(). Vượt quá ngưỡng này khiến consumer bị loại khỏi group và xảy ra rebalance. Giá trị này nên phản ánh thời gian xử lý tối đa được kỳ vọng.

Sẵn sàng chinh phục phỏng vấn Spring Boot?

Luyện tập với mô phỏng tương tác, flashcards và bài kiểm tra kỹ thuật.

Chiến lược retry với RetryTemplate

Các lỗi tạm thời (dịch vụ tạm không sẵn sàng, timeout mạng) đòi hỏi retry tự động. Spring Kafka tích hợp với RetryTemplate để triển khai các chính sách retry tinh vi.

KafkaRetryConfig.javajava
@Configuration
public class KafkaRetryConfig {

    @Bean
    public RetryTemplate kafkaRetryTemplate() {
        RetryTemplate template = new RetryTemplate();

        // Chính sách retry: tối đa 3 lần thử
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        template.setRetryPolicy(retryPolicy);

        // Backoff lũy thừa: 1s, 2s, 4s giữa các lần thử
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        template.setBackOffPolicy(backOffPolicy);

        return template;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            retryableKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                RetryTemplate kafkaRetryTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Cấu hình retry với callback recovery
        factory.setCommonErrorHandler(
            new DefaultErrorHandler(
                // Recovery: hành động sau khi hết lượt thử
                (record, exception) -> {
                    log.error("All retries exhausted for record: key={}, value={}",
                        record.key(), record.value(), exception);
                },
                // Backoff lũy thừa: 1s khởi đầu, tối đa 30s, 3 lần thử
                new ExponentialBackOff(1000L, 2.0)
            )
        );

        return factory;
    }
}

DefaultErrorHandler thay thế SeekToCurrentErrorHandler cũ kể từ Spring Kafka 2.8. Nó cung cấp API rõ ràng hơn và nhiều tùy chọn cấu hình mở rộng.

Triển khai Dead Letter Queue

Sau khi hết lượt thử, các thông điệp lỗi nên được chuyển hướng vào Dead Letter Queue (DLQ) để phân tích sau. Cách tiếp cận này tránh mất dữ liệu và giải phóng consumer khỏi tình trạng kẹt.

DeadLetterConfig.javajava
@Configuration
public class DeadLetterConfig {

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // Quy ước đặt tên topic DLT: original-topic.DLT
        return new DeadLetterPublishingRecoverer(
            kafkaTemplate,
            (record, exception) -> {
                // Topic DLT dựa trên topic nguồn
                String dltTopic = record.topic() + ".DLT";

                log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
                    dltTopic, record.key(), exception.getMessage());

                return new TopicPartition(dltTopic, record.partition());
            }
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            dltKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // ErrorHandler kèm recovery DLT
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            deadLetterPublishingRecoverer,
            new FixedBackOff(1000L, 3)
        );

        // Các exception không nên thử lại (gửi thẳng tới DLT)
        errorHandler.addNotRetryableExceptions(
            ValidationException.class,
            JsonParseException.class,
            NullPointerException.class
        );

        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }
}

Phân biệt giữa exception có thể thử lại và không thể thử lại giúp tối ưu hành vi. ValidationException báo hiệu dữ liệu sai định dạng mà retry không thể sửa, do đó việc gửi thẳng tới DLT là hợp lý.

Consumer DLT cho việc xử lý lại thủ công

Một consumer chuyên biệt giám sát DLT và cho phép xử lý lại thông điệp sau khi đã khắc phục nguyên nhân gốc.

DeadLetterConsumer.javajava
@Service
@Slf4j
public class DeadLetterConsumer {

    private final AlertingService alertingService;
    private final FailedEventRepository failedEventRepository;

    public DeadLetterConsumer(AlertingService alertingService,
                              FailedEventRepository failedEventRepository) {
        this.alertingService = alertingService;
        this.failedEventRepository = failedEventRepository;
    }

    @KafkaListener(
        topics = "${app.kafka.topics.orders}.DLT",
        groupId = "order-service-dlt"
    )
    public void handleDeadLetter(
            ConsumerRecord<String, OrderEvent> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
            @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
            @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
            Acknowledgment acknowledgment) {

        log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
            originalTopic, originalPartition, originalOffset, exceptionMessage);

        // Lưu trữ phục vụ phân tích và xử lý lại sau này
        FailedEvent failedEvent = FailedEvent.builder()
            .eventId(record.value().eventId())
            .eventType(record.value().eventType())
            .payload(serializePayload(record.value()))
            .originalTopic(originalTopic)
            .originalPartition(originalPartition)
            .originalOffset(originalOffset)
            .exceptionMessage(exceptionMessage)
            .stacktrace(stacktrace)
            .status(FailedEventStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        failedEventRepository.save(failedEvent);

        // Cảnh báo cho can thiệp của con người
        alertingService.notifyDeadLetter(failedEvent);

        acknowledgment.acknowledge();
    }
}
Các header DLT

Spring Kafka tự động làm giàu các thông điệp DLT bằng các header chứa metadata sự cố: exception, stacktrace, topic gốc, partition và offset. Thông tin này hỗ trợ việc chẩn đoán.

Quản lý tính bất biến ở phía consumer

Kafka đảm bảo phân phối "at least once": một thông điệp có thể được phân phối nhiều lần nếu xảy ra crash sau khi xử lý nhưng trước khi commit. Tính bất biến phía consumer ngăn các tác dụng phụ do xử lý lại.

IdempotentOrderProcessor.javajava
@Service
@Slf4j
public class IdempotentOrderProcessor {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderService orderService;

    public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
                                    OrderService orderService) {
        this.processedEventRepository = processedEventRepository;
        this.orderService = orderService;
    }

    @Transactional
    public void processIdempotent(OrderEvent event) {
        String eventId = event.eventId();

        // Kiểm tra: sự kiện đã được xử lý chưa?
        if (processedEventRepository.existsByEventId(eventId)) {
            log.info("Event already processed, skipping: {}", eventId);
            return;
        }

        // Xử lý nghiệp vụ
        switch (event.eventType()) {
            case "ORDER_CREATED" -> orderService.createOrder(event.payload());
            case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
            case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
            default -> log.warn("Unknown event type: {}", event.eventType());
        }

        // Ghi nhận xử lý trong cùng một transaction
        ProcessedEvent processed = ProcessedEvent.builder()
            .eventId(eventId)
            .eventType(event.eventType())
            .processedAt(Instant.now())
            .build();

        processedEventRepository.save(processed);

        log.info("Event processed and recorded: {}", eventId);
    }
}
ProcessedEventRepository.javajava
@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {

    // Chỉ mục trên eventId để hiệu năng tối ưu
    boolean existsByEventId(String eventId);

    // Dọn các bản ghi cũ
    @Modifying
    @Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
    int deleteOlderThan(@Param("before") Instant before);
}

Việc ghi nhận xử lý trong cùng transaction với logic nghiệp vụ bảo đảm tính nguyên tử. Một sự cố xảy ra sau khi xử lý nhưng trước khi commit sẽ phát lại cả hai thao tác một cách nhất quán.

Mẫu Transactional Outbox

Mẫu Outbox giải quyết vấn đề nhất quán giữa cơ sở dữ liệu và Kafka. Thay vì publish trực tiếp tới Kafka, sự kiện được lưu vào bảng "outbox" trước, sau đó được chuyển tiếp bởi một tiến trình chuyên biệt.

OutboxEvent.javajava
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String eventId;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String eventType;

    @Column(columnDefinition = "TEXT", nullable = false)
    private String payload;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private OutboxStatus status;

    @Column(nullable = false)
    private Instant createdAt;

    private Instant publishedAt;
}

public enum OutboxStatus {
    PENDING, PUBLISHED, FAILED
}
OrderService.javajava
@Service
@Transactional
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    public OrderService(OrderRepository orderRepository,
                       OutboxRepository outboxRepository,
                       ObjectMapper objectMapper) {
        this.orderRepository = orderRepository;
        this.outboxRepository = outboxRepository;
        this.objectMapper = objectMapper;
    }

    public Order createOrder(CreateOrderRequest request) {
        // Tạo đơn hàng
        Order order = Order.builder()
            .customerId(request.customerId())
            .items(request.items())
            .totalAmount(calculateTotal(request.items()))
            .status(OrderStatus.CREATED)
            .createdAt(Instant.now())
            .build();

        Order savedOrder = orderRepository.save(order);

        // Tạo sự kiện outbox trong cùng transaction
        OrderEvent event = OrderEvent.created(toPayload(savedOrder));

        OutboxEvent outboxEvent = OutboxEvent.builder()
            .eventId(event.eventId())
            .aggregateType("Order")
            .aggregateId(savedOrder.getId().toString())
            .eventType(event.eventType())
            .payload(serialize(event))
            .status(OutboxStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        outboxRepository.save(outboxEvent);

        return savedOrder;
    }

    private String serialize(OrderEvent event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Failed to serialize event", e);
        }
    }
}

Bộ relayer outbox publish các sự kiện tới Kafka theo cách bất đồng bộ:

OutboxRelayScheduler.javajava
@Component
@Slf4j
public class OutboxRelayScheduler {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OutboxRelayScheduler(OutboxRepository outboxRepository,
                               KafkaTemplate<String, String> kafkaTemplate) {
        this.outboxRepository = outboxRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void relayPendingEvents() {
        // Lấy các sự kiện đang chờ kèm khóa
        List<OutboxEvent> pendingEvents = outboxRepository
            .findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);

        for (OutboxEvent event : pendingEvents) {
            try {
                // Publish tới Kafka
                kafkaTemplate.send(
                    ordersTopic,
                    event.getAggregateId(),
                    event.getPayload()
                ).get(5, TimeUnit.SECONDS);

                // Cập nhật trạng thái
                event.setStatus(OutboxStatus.PUBLISHED);
                event.setPublishedAt(Instant.now());
                outboxRepository.save(event);

                log.debug("Outbox event published: {}", event.getEventId());

            } catch (Exception ex) {
                log.error("Failed to publish outbox event: {}",
                    event.getEventId(), ex);
                event.setStatus(OutboxStatus.FAILED);
                outboxRepository.save(event);
            }
        }
    }
}

Mẫu này bảo đảm sự kiện chỉ được publish khi transaction nghiệp vụ thành công, từ đó loại bỏ tình trạng không nhất quán giữa cơ sở dữ liệu và Kafka.

Bắt đầu luyện tập!

Kiểm tra kiến thức với mô phỏng phỏng vấn và bài kiểm tra kỹ thuật.

Giám sát và khả năng quan sát

Việc giám sát một hệ thống Kafka đòi hỏi các metric về độ trễ tiêu thụ, lag và lỗi. Spring Boot Actuator phơi bày các metric này thông qua Micrometer.

KafkaMetricsConfig.javajava
@Configuration
public class KafkaMetricsConfig {

    @Bean
    public MeterBinder kafkaConsumerMetrics(
            ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        return registry -> {
            // Metric tùy biến: sự kiện đã xử lý
            Counter.builder("kafka.consumer.events.processed")
                .description("Number of events successfully processed")
                .tag("topic", "orders")
                .register(registry);

            // Metric tùy biến: sự kiện thất bại
            Counter.builder("kafka.consumer.events.failed")
                .description("Number of events that failed processing")
                .tag("topic", "orders")
                .register(registry);
        };
    }
}
InstrumentedOrderConsumer.javajava
@Service
@Slf4j
public class InstrumentedOrderConsumer {

    private final OrderProcessingService processingService;
    private final MeterRegistry meterRegistry;
    private final Counter processedCounter;
    private final Counter failedCounter;
    private final Timer processingTimer;

    public InstrumentedOrderConsumer(OrderProcessingService processingService,
                                     MeterRegistry meterRegistry) {
        this.processingService = processingService;
        this.meterRegistry = meterRegistry;

        // Khởi tạo bộ đếm
        this.processedCounter = Counter.builder("kafka.consumer.events.processed")
            .tag("topic", "orders")
            .register(meterRegistry);

        this.failedCounter = Counter.builder("kafka.consumer.events.failed")
            .tag("topic", "orders")
            .register(meterRegistry);

        // Timer đo độ trễ xử lý
        this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
            .tag("topic", "orders")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(meterRegistry);
    }

    @KafkaListener(topics = "${app.kafka.topics.orders}")
    public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {

        Timer.Sample sample = Timer.start(meterRegistry);

        try {
            processingService.process(event);
            acknowledgment.acknowledge();

            processedCounter.increment();
            sample.stop(processingTimer);

        } catch (Exception ex) {
            failedCounter.increment();
            sample.stop(processingTimer);
            throw ex;
        }
    }
}

Các metric này cho phép cấu hình cảnh báo về lag consumer, tỷ lệ lỗi và độ trễ xử lý — yếu tố thiết yếu để phát hiện vấn đề một cách chủ động.

Kết luận

Spring Kafka mang lại tích hợp vững chắc để xây dựng các kiến trúc event-driven có khả năng chịu lỗi. Việc nắm vững các cơ chế retry, dead letter queue và tính bất biến tạo nên nền tảng cho các ứng dụng sẵn sàng cho môi trường sản xuất.

Danh sách kiểm tra cho kiến trúc event-driven với Spring Kafka:

  • ✅ Cấu hình commit offset thủ công với AckMode.MANUAL
  • ✅ Dùng key phân vùng nhất quán để giữ thứ tự sự kiện
  • ✅ Triển khai retry với backoff lũy thừa qua DefaultErrorHandler
  • ✅ Định tuyến thông điệp lỗi vào Dead Letter Queue
  • ✅ Bảo đảm tính bất biến phía consumer thông qua theo dõi eventId
  • ✅ Cân nhắc mẫu Outbox để giữ nhất quán giữa cơ sở dữ liệu và Kafka
  • ✅ Phơi bày metric qua Micrometer cho việc giám sát
  • ✅ Điều chỉnh MAX_POLL_INTERVAL_MS theo thời gian xử lý tối đa

Thẻ

#spring kafka
#event-driven
#kafka consumer
#microservices
#resilience

Chia sẻ

Bài viết liên quan