Spring Kafka: kiến trúc event-driven với consumer chịu lỗi
Hướng dẫn đầy đủ về Spring Kafka cho kiến trúc event-driven. Cấu hình, consumer chịu lỗi, chính sách retry, dead letter queue và các mẫu sản xuất cho ứng dụng phân tán.

Apache Kafka đã trở thành tiêu chuẩn de facto cho các kiến trúc event-driven quy mô lớn. Spring Kafka đơn giản hóa việc tích hợp vào các ứng dụng Spring Boot, đồng thời cung cấp những cơ chế chịu lỗi thiết yếu cho môi trường sản xuất. Hướng dẫn này phân tích sâu cấu hình, các mẫu tiêu thụ và chiến lược xử lý lỗi.
Hướng dẫn giả định người đọc đã quen với các khái niệm cơ bản của Kafka: topic, partition, consumer group và offset. Trọng tâm đặt vào tích hợp với Spring và các mẫu chịu lỗi.
Vì sao chọn kiến trúc event-driven?
Các kiến trúc event-driven tách rời các thành phần hệ thống thông qua sự kiện bất đồng bộ. Khác với các lời gọi REST đồng bộ, producer phát sự kiện mà không cần chờ phản hồi, cho phép consumer xử lý theo nhịp riêng.
Cách tiếp cận này mang lại nhiều lợi ích then chốt: khả năng mở rộng theo chiều ngang độc lập cho từng dịch vụ, độ bền cao hơn trước các sự cố tạm thời và khả năng truy vết toàn diện nhờ log bất biến của Kafka.
public record OrderEvent(
// Định danh duy nhất của sự kiện cho tính bất biến (idempotence)
String eventId,
// Loại sự kiện dùng cho định tuyến
String eventType,
// Dấu thời gian khởi tạo
Instant createdAt,
// Payload nghiệp vụ
OrderPayload payload
) {
// Phương thức factory bảo đảm tính duy nhất của eventId
public static OrderEvent created(OrderPayload payload) {
return new OrderEvent(
UUID.randomUUID().toString(),
"ORDER_CREATED",
Instant.now(),
payload
);
}
}
public record OrderPayload(
Long orderId,
Long customerId,
List<OrderItem> items,
BigDecimal totalAmount
) {}Cấu trúc sự kiện luôn bao gồm một định danh duy nhất, một loại và một dấu thời gian. Các metadata này hỗ trợ lọc ở phía consumer, phát hiện trùng lặp và truy vết theo thời gian.
Cấu hình cơ bản Spring Kafka
Việc tích hợp bắt đầu với starter spring-kafka và cấu hình YAML tối thiểu. Spring Boot tự động cấu hình các bean thiết yếu: KafkaTemplate để sản xuất và ConcurrentKafkaListenerContainerFactory để tiêu thụ.
# application.yml
spring:
kafka:
# Địa chỉ broker Kafka (cluster)
bootstrap-servers: localhost:9092
# Cấu hình producer
producer:
# Tuần tự hóa key kiểu String
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Tuần tự hóa value JSON
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# Chờ xác nhận từ tất cả các replica
acks: all
# Số lần thử lại khi lỗi mạng
retries: 3
# Cấu hình consumer
consumer:
# Định danh consumer group
group-id: order-service
# Giải tuần tự key
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Giải tuần tự JSON theo kiểu đích
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# Vị trí khởi đầu khi chưa có offset
auto-offset-reset: earliest
# Tắt auto-commit để kiểm soát thủ công
enable-auto-commit: false
properties:
# Package tin cậy cho deserialization
spring.json.trusted.packages: com.example.eventsTắt enable-auto-commit là một thực hành thiết yếu trong môi trường sản xuất. Commit offset thủ công bảo đảm một thông điệp chỉ được đánh dấu là đã xử lý sau khi việc xử lý thực sự hoàn tất.
Tạo producer Kafka
KafkaTemplate đóng gói logic gửi đến Kafka. Việc tiêm trực tiếp cho phép sử dụng ngay trong các dịch vụ nghiệp vụ.
@Service
@Slf4j
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
// Topic đích (đặt bên ngoài cấu hình)
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
OrderPayload payload) {
// Tạo sự kiện kèm metadata
OrderEvent event = OrderEvent.created(payload);
// Sử dụng orderId làm key phân vùng
// Bảo đảm thứ tự sự kiện cho cùng một đơn hàng
String partitionKey = payload.orderId().toString();
log.info("Publishing ORDER_CREATED event: {} to topic: {}",
event.eventId(), ordersTopic);
// Gửi bất đồng bộ kèm callback
return kafkaTemplate.send(ordersTopic, partitionKey, event)
.whenComplete((result, ex) -> {
if (ex == null) {
// Thành công: ghi log metadata gửi
RecordMetadata metadata = result.getRecordMetadata();
log.info("Event sent successfully: topic={}, partition={}, offset={}",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
// Thất bại: ghi log lỗi để điều tra
log.error("Failed to send event: {}", event.eventId(), ex);
}
});
}
}Việc dùng key phân vùng dựa trên định danh nghiệp vụ bảo đảm mọi sự kiện của cùng một thực thể rơi vào cùng một partition, qua đó duy trì thứ tự thời gian.
Key null sẽ phân phối thông điệp theo round-robin giữa các partition. Cách đó tối đa hóa tính song song nhưng đánh đổi đảm bảo về thứ tự. Việc chọn key tùy thuộc yêu cầu nghiệp vụ.
Consumer cơ bản với @KafkaListener
Annotation @KafkaListener biến một phương thức thành consumer Kafka. Spring tự động xử lý vòng polling, deserialization và commit offset.
@Service
@Slf4j
public class OrderEventConsumer {
private final OrderProcessingService processingService;
public OrderEventConsumer(OrderProcessingService processingService) {
this.processingService = processingService;
}
@KafkaListener(
// Topic cần lắng nghe
topics = "${app.kafka.topics.orders}",
// Consumer group
groupId = "${spring.kafka.consumer.group-id}",
// Factory tùy biến cho cấu hình nâng cao
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
// Payload đã được giải tuần tự tự động
OrderEvent event,
// Metadata Kafka được tiêm vào
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
// Acknowledgment để commit thủ công
Acknowledgment acknowledgment) {
log.info("Received event: type={}, partition={}, offset={}",
event.eventType(), partition, offset);
try {
// Xử lý nghiệp vụ
processingService.process(event);
// Chỉ commit offset sau khi xử lý thành công
acknowledgment.acknowledge();
log.info("Event processed successfully: {}", event.eventId());
} catch (Exception ex) {
// Việc thiếu acknowledge() sẽ kích hoạt xử lý lại
log.error("Failed to process event: {}", event.eventId(), ex);
throw ex;
}
}
}Tiêm Acknowledgment cho phép kiểm soát commit một cách rõ ràng. Nếu không gọi acknowledge(), offset vẫn chưa được commit và thông điệp sẽ được phân phối lại ở lần poll kế tiếp.
Cấu hình ConsumerFactory nâng cao
Cấu hình mặc định phù hợp cho phát triển nhưng cần điều chỉnh khi triển khai thực tế. Một factory tùy biến mang lại sự kiểm soát chi tiết đối với hành vi của consumer.
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, OrderEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
// Cấu hình kết nối
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// Giải tuần tự
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
// Quản lý offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Hiệu năng: số bản ghi mỗi poll
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// Timeout phiên để phát hiện lỗi
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// Khoảng heartbeat (1/3 timeout phiên)
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
// Thời gian xử lý tối đa trước khi rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// Cấu hình deserializer JSON
JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
deserializer.addTrustedPackages("com.example.events");
deserializer.setUseTypeMapperForKey(false);
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
deserializer
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Chế độ acknowledgment thủ công
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// Số luồng consumer
factory.setConcurrency(3);
// Tắt xử lý theo lô (mỗi lần một thông điệp)
factory.setBatchListener(false);
return factory;
}
}Tham số MAX_POLL_INTERVAL_MS_CONFIG xác định độ trễ tối đa giữa các lần gọi poll(). Vượt quá ngưỡng này khiến consumer bị loại khỏi group và xảy ra rebalance. Giá trị này nên phản ánh thời gian xử lý tối đa được kỳ vọng.
Sẵn sàng chinh phục phỏng vấn Spring Boot?
Luyện tập với mô phỏng tương tác, flashcards và bài kiểm tra kỹ thuật.
Chiến lược retry với RetryTemplate
Các lỗi tạm thời (dịch vụ tạm không sẵn sàng, timeout mạng) đòi hỏi retry tự động. Spring Kafka tích hợp với RetryTemplate để triển khai các chính sách retry tinh vi.
@Configuration
public class KafkaRetryConfig {
@Bean
public RetryTemplate kafkaRetryTemplate() {
RetryTemplate template = new RetryTemplate();
// Chính sách retry: tối đa 3 lần thử
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
template.setRetryPolicy(retryPolicy);
// Backoff lũy thừa: 1s, 2s, 4s giữa các lần thử
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
template.setBackOffPolicy(backOffPolicy);
return template;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
retryableKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
RetryTemplate kafkaRetryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// Cấu hình retry với callback recovery
factory.setCommonErrorHandler(
new DefaultErrorHandler(
// Recovery: hành động sau khi hết lượt thử
(record, exception) -> {
log.error("All retries exhausted for record: key={}, value={}",
record.key(), record.value(), exception);
},
// Backoff lũy thừa: 1s khởi đầu, tối đa 30s, 3 lần thử
new ExponentialBackOff(1000L, 2.0)
)
);
return factory;
}
}DefaultErrorHandler thay thế SeekToCurrentErrorHandler cũ kể từ Spring Kafka 2.8. Nó cung cấp API rõ ràng hơn và nhiều tùy chọn cấu hình mở rộng.
Triển khai Dead Letter Queue
Sau khi hết lượt thử, các thông điệp lỗi nên được chuyển hướng vào Dead Letter Queue (DLQ) để phân tích sau. Cách tiếp cận này tránh mất dữ liệu và giải phóng consumer khỏi tình trạng kẹt.
@Configuration
public class DeadLetterConfig {
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
KafkaTemplate<String, Object> kafkaTemplate) {
// Quy ước đặt tên topic DLT: original-topic.DLT
return new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, exception) -> {
// Topic DLT dựa trên topic nguồn
String dltTopic = record.topic() + ".DLT";
log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
dltTopic, record.key(), exception.getMessage());
return new TopicPartition(dltTopic, record.partition());
}
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
dltKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// ErrorHandler kèm recovery DLT
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
deadLetterPublishingRecoverer,
new FixedBackOff(1000L, 3)
);
// Các exception không nên thử lại (gửi thẳng tới DLT)
errorHandler.addNotRetryableExceptions(
ValidationException.class,
JsonParseException.class,
NullPointerException.class
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}Phân biệt giữa exception có thể thử lại và không thể thử lại giúp tối ưu hành vi. ValidationException báo hiệu dữ liệu sai định dạng mà retry không thể sửa, do đó việc gửi thẳng tới DLT là hợp lý.
Consumer DLT cho việc xử lý lại thủ công
Một consumer chuyên biệt giám sát DLT và cho phép xử lý lại thông điệp sau khi đã khắc phục nguyên nhân gốc.
@Service
@Slf4j
public class DeadLetterConsumer {
private final AlertingService alertingService;
private final FailedEventRepository failedEventRepository;
public DeadLetterConsumer(AlertingService alertingService,
FailedEventRepository failedEventRepository) {
this.alertingService = alertingService;
this.failedEventRepository = failedEventRepository;
}
@KafkaListener(
topics = "${app.kafka.topics.orders}.DLT",
groupId = "order-service-dlt"
)
public void handleDeadLetter(
ConsumerRecord<String, OrderEvent> record,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
@Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
@Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
Acknowledgment acknowledgment) {
log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
originalTopic, originalPartition, originalOffset, exceptionMessage);
// Lưu trữ phục vụ phân tích và xử lý lại sau này
FailedEvent failedEvent = FailedEvent.builder()
.eventId(record.value().eventId())
.eventType(record.value().eventType())
.payload(serializePayload(record.value()))
.originalTopic(originalTopic)
.originalPartition(originalPartition)
.originalOffset(originalOffset)
.exceptionMessage(exceptionMessage)
.stacktrace(stacktrace)
.status(FailedEventStatus.PENDING)
.createdAt(Instant.now())
.build();
failedEventRepository.save(failedEvent);
// Cảnh báo cho can thiệp của con người
alertingService.notifyDeadLetter(failedEvent);
acknowledgment.acknowledge();
}
}Spring Kafka tự động làm giàu các thông điệp DLT bằng các header chứa metadata sự cố: exception, stacktrace, topic gốc, partition và offset. Thông tin này hỗ trợ việc chẩn đoán.
Quản lý tính bất biến ở phía consumer
Kafka đảm bảo phân phối "at least once": một thông điệp có thể được phân phối nhiều lần nếu xảy ra crash sau khi xử lý nhưng trước khi commit. Tính bất biến phía consumer ngăn các tác dụng phụ do xử lý lại.
@Service
@Slf4j
public class IdempotentOrderProcessor {
private final ProcessedEventRepository processedEventRepository;
private final OrderService orderService;
public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
OrderService orderService) {
this.processedEventRepository = processedEventRepository;
this.orderService = orderService;
}
@Transactional
public void processIdempotent(OrderEvent event) {
String eventId = event.eventId();
// Kiểm tra: sự kiện đã được xử lý chưa?
if (processedEventRepository.existsByEventId(eventId)) {
log.info("Event already processed, skipping: {}", eventId);
return;
}
// Xử lý nghiệp vụ
switch (event.eventType()) {
case "ORDER_CREATED" -> orderService.createOrder(event.payload());
case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
default -> log.warn("Unknown event type: {}", event.eventType());
}
// Ghi nhận xử lý trong cùng một transaction
ProcessedEvent processed = ProcessedEvent.builder()
.eventId(eventId)
.eventType(event.eventType())
.processedAt(Instant.now())
.build();
processedEventRepository.save(processed);
log.info("Event processed and recorded: {}", eventId);
}
}@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {
// Chỉ mục trên eventId để hiệu năng tối ưu
boolean existsByEventId(String eventId);
// Dọn các bản ghi cũ
@Modifying
@Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
int deleteOlderThan(@Param("before") Instant before);
}Việc ghi nhận xử lý trong cùng transaction với logic nghiệp vụ bảo đảm tính nguyên tử. Một sự cố xảy ra sau khi xử lý nhưng trước khi commit sẽ phát lại cả hai thao tác một cách nhất quán.
Mẫu Transactional Outbox
Mẫu Outbox giải quyết vấn đề nhất quán giữa cơ sở dữ liệu và Kafka. Thay vì publish trực tiếp tới Kafka, sự kiện được lưu vào bảng "outbox" trước, sau đó được chuyển tiếp bởi một tiến trình chuyên biệt.
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String eventId;
@Column(nullable = false)
private String aggregateType;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String eventType;
@Column(columnDefinition = "TEXT", nullable = false)
private String payload;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private OutboxStatus status;
@Column(nullable = false)
private Instant createdAt;
private Instant publishedAt;
}
public enum OutboxStatus {
PENDING, PUBLISHED, FAILED
}@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
public OrderService(OrderRepository orderRepository,
OutboxRepository outboxRepository,
ObjectMapper objectMapper) {
this.orderRepository = orderRepository;
this.outboxRepository = outboxRepository;
this.objectMapper = objectMapper;
}
public Order createOrder(CreateOrderRequest request) {
// Tạo đơn hàng
Order order = Order.builder()
.customerId(request.customerId())
.items(request.items())
.totalAmount(calculateTotal(request.items()))
.status(OrderStatus.CREATED)
.createdAt(Instant.now())
.build();
Order savedOrder = orderRepository.save(order);
// Tạo sự kiện outbox trong cùng transaction
OrderEvent event = OrderEvent.created(toPayload(savedOrder));
OutboxEvent outboxEvent = OutboxEvent.builder()
.eventId(event.eventId())
.aggregateType("Order")
.aggregateId(savedOrder.getId().toString())
.eventType(event.eventType())
.payload(serialize(event))
.status(OutboxStatus.PENDING)
.createdAt(Instant.now())
.build();
outboxRepository.save(outboxEvent);
return savedOrder;
}
private String serialize(OrderEvent event) {
try {
return objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
throw new SerializationException("Failed to serialize event", e);
}
}
}Bộ relayer outbox publish các sự kiện tới Kafka theo cách bất đồng bộ:
@Component
@Slf4j
public class OutboxRelayScheduler {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OutboxRelayScheduler(OutboxRepository outboxRepository,
KafkaTemplate<String, String> kafkaTemplate) {
this.outboxRepository = outboxRepository;
this.kafkaTemplate = kafkaTemplate;
}
@Scheduled(fixedDelay = 1000)
@Transactional
public void relayPendingEvents() {
// Lấy các sự kiện đang chờ kèm khóa
List<OutboxEvent> pendingEvents = outboxRepository
.findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);
for (OutboxEvent event : pendingEvents) {
try {
// Publish tới Kafka
kafkaTemplate.send(
ordersTopic,
event.getAggregateId(),
event.getPayload()
).get(5, TimeUnit.SECONDS);
// Cập nhật trạng thái
event.setStatus(OutboxStatus.PUBLISHED);
event.setPublishedAt(Instant.now());
outboxRepository.save(event);
log.debug("Outbox event published: {}", event.getEventId());
} catch (Exception ex) {
log.error("Failed to publish outbox event: {}",
event.getEventId(), ex);
event.setStatus(OutboxStatus.FAILED);
outboxRepository.save(event);
}
}
}
}Mẫu này bảo đảm sự kiện chỉ được publish khi transaction nghiệp vụ thành công, từ đó loại bỏ tình trạng không nhất quán giữa cơ sở dữ liệu và Kafka.
Bắt đầu luyện tập!
Kiểm tra kiến thức với mô phỏng phỏng vấn và bài kiểm tra kỹ thuật.
Giám sát và khả năng quan sát
Việc giám sát một hệ thống Kafka đòi hỏi các metric về độ trễ tiêu thụ, lag và lỗi. Spring Boot Actuator phơi bày các metric này thông qua Micrometer.
@Configuration
public class KafkaMetricsConfig {
@Bean
public MeterBinder kafkaConsumerMetrics(
ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
return registry -> {
// Metric tùy biến: sự kiện đã xử lý
Counter.builder("kafka.consumer.events.processed")
.description("Number of events successfully processed")
.tag("topic", "orders")
.register(registry);
// Metric tùy biến: sự kiện thất bại
Counter.builder("kafka.consumer.events.failed")
.description("Number of events that failed processing")
.tag("topic", "orders")
.register(registry);
};
}
}@Service
@Slf4j
public class InstrumentedOrderConsumer {
private final OrderProcessingService processingService;
private final MeterRegistry meterRegistry;
private final Counter processedCounter;
private final Counter failedCounter;
private final Timer processingTimer;
public InstrumentedOrderConsumer(OrderProcessingService processingService,
MeterRegistry meterRegistry) {
this.processingService = processingService;
this.meterRegistry = meterRegistry;
// Khởi tạo bộ đếm
this.processedCounter = Counter.builder("kafka.consumer.events.processed")
.tag("topic", "orders")
.register(meterRegistry);
this.failedCounter = Counter.builder("kafka.consumer.events.failed")
.tag("topic", "orders")
.register(meterRegistry);
// Timer đo độ trễ xử lý
this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
.tag("topic", "orders")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
}
@KafkaListener(topics = "${app.kafka.topics.orders}")
public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
processingService.process(event);
acknowledgment.acknowledge();
processedCounter.increment();
sample.stop(processingTimer);
} catch (Exception ex) {
failedCounter.increment();
sample.stop(processingTimer);
throw ex;
}
}
}Các metric này cho phép cấu hình cảnh báo về lag consumer, tỷ lệ lỗi và độ trễ xử lý — yếu tố thiết yếu để phát hiện vấn đề một cách chủ động.
Kết luận
Spring Kafka mang lại tích hợp vững chắc để xây dựng các kiến trúc event-driven có khả năng chịu lỗi. Việc nắm vững các cơ chế retry, dead letter queue và tính bất biến tạo nên nền tảng cho các ứng dụng sẵn sàng cho môi trường sản xuất.
Danh sách kiểm tra cho kiến trúc event-driven với Spring Kafka:
- ✅ Cấu hình commit offset thủ công với
AckMode.MANUAL - ✅ Dùng key phân vùng nhất quán để giữ thứ tự sự kiện
- ✅ Triển khai retry với backoff lũy thừa qua
DefaultErrorHandler - ✅ Định tuyến thông điệp lỗi vào Dead Letter Queue
- ✅ Bảo đảm tính bất biến phía consumer thông qua theo dõi eventId
- ✅ Cân nhắc mẫu Outbox để giữ nhất quán giữa cơ sở dữ liệu và Kafka
- ✅ Phơi bày metric qua Micrometer cho việc giám sát
- ✅ Điều chỉnh
MAX_POLL_INTERVAL_MStheo thời gian xử lý tối đa
Thẻ
Chia sẻ
Bài viết liên quan

Phỏng vấn Spring Cloud Gateway: Routing, Filter và Load Balancing
Làm chủ Spring Cloud Gateway cho phỏng vấn kỹ thuật: 12 câu hỏi về routing, filter, load balancing và mẫu API Gateway kèm ví dụ mã.

Logging Spring Boot năm 2026: log có cấu trúc trên production với Logback và JSON
Hướng dẫn đầy đủ về logging có cấu trúc trong Spring Boot. Cấu hình Logback JSON, MDC cho tracing, các best practice production và tích hợp ELK Stack.

Phỏng vấn Spring GraphQL: Resolver, DataLoader và Giải pháp cho Vấn đề N+1
Chuẩn bị cho phỏng vấn Spring GraphQL với hướng dẫn đầy đủ này. Resolver, DataLoader, xử lý vấn đề N+1, mutation và các thực hành tốt nhất cho câu hỏi kỹ thuật.