Spring Kafka: สถาปัตยกรรม event-driven พร้อม consumer ที่ทนทาน
คู่มือ Spring Kafka แบบครบถ้วนสำหรับสถาปัตยกรรม event-driven การตั้งค่า consumer ที่ทนทาน นโยบาย retry dead letter queue และรูปแบบโปรดักชันสำหรับแอปพลิเคชันแบบกระจาย

Apache Kafka กลายเป็นมาตรฐานโดยพฤตินัยสำหรับสถาปัตยกรรม event-driven ขนาดใหญ่ Spring Kafka ช่วยให้การผนวกรวมเข้ากับแอปพลิเคชัน Spring Boot ง่ายขึ้น พร้อมทั้งมอบกลไกความทนทานที่จำเป็นสำหรับสภาพแวดล้อมโปรดักชัน คู่มือนี้ลงลึกในเรื่องการกำหนดค่า รูปแบบการบริโภค และกลยุทธ์การจัดการข้อผิดพลาด
คู่มือนี้สมมุติว่าคุ้นเคยกับแนวคิดพื้นฐานของ Kafka แล้ว ได้แก่ topic, partition, consumer group และ offset โดยเน้นไปที่การผนวกรวมกับ Spring และรูปแบบความทนทาน
ทำไมจึงเลือกสถาปัตยกรรม event-driven?
สถาปัตยกรรม event-driven จะแยกองค์ประกอบของระบบออกจากกันด้วยเหตุการณ์แบบอะซิงโครนัส ต่างจากการเรียก REST แบบซิงโครนัส โปรดิวเซอร์จะปล่อยเหตุการณ์โดยไม่รอการตอบกลับ ทำให้ผู้บริโภคประมวลผลได้ตามจังหวะของตนเอง
แนวทางนี้มอบประโยชน์สำคัญหลายประการ ได้แก่ ความสามารถในการขยายตามแนวนอนแบบอิสระต่อบริการ ความทนทานที่สูงขึ้นต่อความล้มเหลวชั่วคราว และความสามารถในการตรวจสอบย้อนหลังครบถ้วนผ่านล็อกที่ไม่สามารถแก้ไขได้ของ Kafka
public record OrderEvent(
// ตัวระบุเหตุการณ์เฉพาะสำหรับ idempotence
String eventId,
// ประเภทของเหตุการณ์สำหรับการกำหนดเส้นทาง
String eventType,
// เวลาที่สร้าง
Instant createdAt,
// payload เชิงธุรกิจ
OrderPayload payload
) {
// เมธอด factory ที่รับประกันความเฉพาะตัวของ eventId
public static OrderEvent created(OrderPayload payload) {
return new OrderEvent(
UUID.randomUUID().toString(),
"ORDER_CREATED",
Instant.now(),
payload
);
}
}
public record OrderPayload(
Long orderId,
Long customerId,
List<OrderItem> items,
BigDecimal totalAmount
) {}โครงสร้างของเหตุการณ์มีตัวระบุเฉพาะ ประเภท และเวลาเสมอ เมตาดาตาเหล่านี้ช่วยให้ทำการกรองที่ฝั่งผู้บริโภค ตรวจจับซ้ำซ้อน และติดตามตามลำดับเวลาได้
การกำหนดค่าพื้นฐานของ Spring Kafka
การผนวกรวมเริ่มต้นจาก starter spring-kafka และการกำหนดค่า YAML ขั้นต่ำ Spring Boot จะกำหนดค่า bean สำคัญให้อัตโนมัติ ได้แก่ KafkaTemplate สำหรับการส่ง และ ConcurrentKafkaListenerContainerFactory สำหรับการบริโภค
# application.yml
spring:
kafka:
# ที่อยู่โบรกเกอร์ Kafka (คลัสเตอร์)
bootstrap-servers: localhost:9092
# การกำหนดค่าโปรดิวเซอร์
producer:
# serialization คีย์แบบ String
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# serialization ค่าเป็น JSON
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# รอการตอบรับจากเรพลิกาทั้งหมด
acks: all
# จำนวนครั้งที่ลองใหม่เมื่อเครือข่ายล้มเหลว
retries: 3
# การกำหนดค่าผู้บริโภค
consumer:
# ตัวระบุของ consumer group
group-id: order-service
# deserialization ของคีย์
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# deserialization JSON พร้อมประเภทเป้าหมาย
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# ตำแหน่งเริ่มต้นเมื่อยังไม่มี offset บันทึก
auto-offset-reset: earliest
# ปิด auto-commit เพื่อควบคุมเอง
enable-auto-commit: false
properties:
# แพ็กเกจที่เชื่อถือได้สำหรับ deserialization
spring.json.trusted.packages: com.example.eventsการปิด enable-auto-commit เป็นแนวปฏิบัติที่จำเป็นในโปรดักชัน การ commit offset แบบแมนนวลทำให้ข้อความจะถูกทำเครื่องหมายว่าประมวลผลแล้วก็ต่อเมื่อการประมวลผลเสร็จสิ้นจริง
การสร้างโปรดิวเซอร์ Kafka
KafkaTemplate ห่อหุ้มลอจิกการส่งไปยัง Kafka การ inject ตรงทำให้สามารถใช้งานได้ทันทีภายในบริการเชิงธุรกิจ
@Service
@Slf4j
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
// topic ปลายทาง (ดึงจากภายนอกการกำหนดค่า)
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
OrderPayload payload) {
// สร้างเหตุการณ์พร้อมเมตาดาตา
OrderEvent event = OrderEvent.created(payload);
// ใช้ orderId เป็นคีย์ของ partition
// รับประกันลำดับของเหตุการณ์สำหรับคำสั่งซื้อเดียวกัน
String partitionKey = payload.orderId().toString();
log.info("Publishing ORDER_CREATED event: {} to topic: {}",
event.eventId(), ordersTopic);
// ส่งแบบอะซิงโครนัสพร้อม callback
return kafkaTemplate.send(ordersTopic, partitionKey, event)
.whenComplete((result, ex) -> {
if (ex == null) {
// สำเร็จ: ล็อกเมตาดาตาของการส่ง
RecordMetadata metadata = result.getRecordMetadata();
log.info("Event sent successfully: topic={}, partition={}, offset={}",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
// ล้มเหลว: ล็อกข้อผิดพลาดเพื่อตรวจสอบ
log.error("Failed to send event: {}", event.eventId(), ex);
}
});
}
}การใช้คีย์ของ partition ที่อิงจากตัวระบุทางธุรกิจทำให้เหตุการณ์ทั้งหมดของ entity เดียวกันลงใน partition เดียวกัน จึงรักษาลำดับตามเวลาเอาไว้
คีย์ที่เป็น null จะกระจายข้อความแบบ round-robin ระหว่าง partition ซึ่งช่วยเพิ่มการประมวลผลแบบขนานสูงสุด แต่จะสูญเสียการรับประกันลำดับ การเลือกคีย์ขึ้นอยู่กับความต้องการทางธุรกิจ
ผู้บริโภคพื้นฐานด้วย @KafkaListener
แอนโนเตชัน @KafkaListener แปลงเมธอดให้กลายเป็นผู้บริโภค Kafka Spring จะจัดการลูป polling, deserialization และการ commit offset ให้อัตโนมัติ
@Service
@Slf4j
public class OrderEventConsumer {
private final OrderProcessingService processingService;
public OrderEventConsumer(OrderProcessingService processingService) {
this.processingService = processingService;
}
@KafkaListener(
// topic ที่ต้องฟัง
topics = "${app.kafka.topics.orders}",
// consumer group
groupId = "${spring.kafka.consumer.group-id}",
// factory แบบกำหนดเองสำหรับการกำหนดค่าขั้นสูง
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
// payload ที่ deserialize อัตโนมัติ
OrderEvent event,
// เมตาดาตา Kafka ที่ถูก inject
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
// Acknowledgment สำหรับการ commit แบบแมนนวล
Acknowledgment acknowledgment) {
log.info("Received event: type={}, partition={}, offset={}",
event.eventType(), partition, offset);
try {
// การประมวลผลทางธุรกิจ
processingService.process(event);
// commit offset เฉพาะเมื่อประมวลผลสำเร็จ
acknowledgment.acknowledge();
log.info("Event processed successfully: {}", event.eventId());
} catch (Exception ex) {
// หากไม่มีการเรียก acknowledge() จะทำให้เกิดการประมวลซ้ำ
log.error("Failed to process event: {}", event.eventId(), ex);
throw ex;
}
}
}การ inject Acknowledgment ช่วยให้ควบคุมการ commit ได้อย่างชัดเจน หากไม่มีการเรียก acknowledge() offset จะยังไม่ถูก commit และข้อความจะถูกส่งซ้ำในการ poll ครั้งถัดไป
การกำหนดค่า ConsumerFactory ขั้นสูง
การกำหนดค่าเริ่มต้นเหมาะกับการพัฒนา แต่ต้องปรับเปลี่ยนเมื่อใช้งานจริง factory แบบกำหนดเองให้การควบคุมพฤติกรรมของผู้บริโภคได้อย่างละเอียด
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, OrderEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
// การกำหนดค่าการเชื่อมต่อ
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// deserialization
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
// การจัดการ offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// ประสิทธิภาพ: จำนวนเรคคอร์ดต่อหนึ่ง poll
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// session timeout เพื่อตรวจจับความล้มเหลว
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// ช่วงเวลา heartbeat (1/3 ของ session timeout)
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
// เวลาประมวลผลสูงสุดก่อน rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// การกำหนดค่าของ JSON deserializer
JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
deserializer.addTrustedPackages("com.example.events");
deserializer.setUseTypeMapperForKey(false);
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
deserializer
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// โหมด acknowledgment แบบแมนนวล
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// จำนวนเทรดของผู้บริโภค
factory.setConcurrency(3);
// ปิดการประมวลผลแบบแบตช์ (ครั้งละหนึ่งข้อความ)
factory.setBatchListener(false);
return factory;
}
}พารามิเตอร์ MAX_POLL_INTERVAL_MS_CONFIG กำหนดดีเลย์สูงสุดระหว่างการเรียก poll() หากเกินค่านี้ ผู้บริโภคจะถูกขับออกจากกลุ่มและเกิด rebalance ค่านี้ควรสะท้อนเวลาประมวลผลสูงสุดที่คาดไว้
พร้อมที่จะพิชิตการสัมภาษณ์ Spring Boot แล้วหรือยังครับ?
ฝึกฝนด้วยตัวจำลองแบบโต้ตอบ, flashcards และแบบทดสอบเทคนิคครับ
กลยุทธ์การลองใหม่ด้วย RetryTemplate
ข้อผิดพลาดชั่วคราว (บริการขาดหายไประยะสั้น เครือข่ายหมดเวลา) ต้องการการลองใหม่อัตโนมัติ Spring Kafka ผนวกกับ RetryTemplate เพื่อรองรับนโยบายการลองใหม่ขั้นสูง
@Configuration
public class KafkaRetryConfig {
@Bean
public RetryTemplate kafkaRetryTemplate() {
RetryTemplate template = new RetryTemplate();
// นโยบายลองใหม่: สูงสุด 3 ครั้ง
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
template.setRetryPolicy(retryPolicy);
// backoff แบบเอกซ์โพเนนเชียล: 1 วินาที, 2 วินาที, 4 วินาที ระหว่างการลองใหม่
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
template.setBackOffPolicy(backOffPolicy);
return template;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
retryableKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
RetryTemplate kafkaRetryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// การกำหนดค่าการลองใหม่พร้อม recovery callback
factory.setCommonErrorHandler(
new DefaultErrorHandler(
// recovery: การกระทำเมื่อความพยายามหมด
(record, exception) -> {
log.error("All retries exhausted for record: key={}, value={}",
record.key(), record.value(), exception);
},
// backoff แบบเอกซ์โพเนนเชียล: เริ่ม 1 วินาที สูงสุด 30 วินาที 3 ครั้ง
new ExponentialBackOff(1000L, 2.0)
)
);
return factory;
}
}DefaultErrorHandler มาแทนที่ SeekToCurrentErrorHandler เดิมตั้งแต่ Spring Kafka 2.8 ทำให้ API ชัดเจนขึ้นและเปิดทางเลือกในการกำหนดค่าได้กว้างขึ้น
การใช้งาน Dead Letter Queue
หลังจากการลองใหม่หมดลง ข้อความที่ล้มเหลวควรถูกส่งต่อไปยัง Dead Letter Queue (DLQ) เพื่อนำไปวิเคราะห์ภายหลัง วิธีนี้ช่วยป้องกันการสูญหายของข้อมูลและคลายการอุดตันของผู้บริโภค
@Configuration
public class DeadLetterConfig {
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
KafkaTemplate<String, Object> kafkaTemplate) {
// กลยุทธ์การตั้งชื่อ topic DLT: original-topic.DLT
return new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, exception) -> {
// topic DLT อิงจาก topic ต้นทาง
String dltTopic = record.topic() + ".DLT";
log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
dltTopic, record.key(), exception.getMessage());
return new TopicPartition(dltTopic, record.partition());
}
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
dltKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// ErrorHandler พร้อม recovery แบบ DLT
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
deadLetterPublishingRecoverer,
new FixedBackOff(1000L, 3)
);
// ข้อยกเว้นที่ไม่ควรลองใหม่ (ส่งตรงไปยัง DLT)
errorHandler.addNotRetryableExceptions(
ValidationException.class,
JsonParseException.class,
NullPointerException.class
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}การแยกแยะข้อยกเว้นที่ลองใหม่ได้และไม่ควรลองใหม่จะช่วยปรับพฤติกรรมให้เหมาะสม ValidationException บ่งบอกถึงข้อมูลที่ผิดรูปแบบซึ่งการลองใหม่ก็ไม่อาจแก้ไขได้ การส่งตรงไป DLT จึงสมเหตุสมผล
ผู้บริโภค DLT สำหรับการประมวลซ้ำแบบแมนนวล
ผู้บริโภคเฉพาะทางจะคอยตรวจสอบ DLT และเปิดให้ประมวลซ้ำข้อความหลังจากแก้ไขสาเหตุที่แท้จริงแล้ว
@Service
@Slf4j
public class DeadLetterConsumer {
private final AlertingService alertingService;
private final FailedEventRepository failedEventRepository;
public DeadLetterConsumer(AlertingService alertingService,
FailedEventRepository failedEventRepository) {
this.alertingService = alertingService;
this.failedEventRepository = failedEventRepository;
}
@KafkaListener(
topics = "${app.kafka.topics.orders}.DLT",
groupId = "order-service-dlt"
)
public void handleDeadLetter(
ConsumerRecord<String, OrderEvent> record,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
@Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
@Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
Acknowledgment acknowledgment) {
log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
originalTopic, originalPartition, originalOffset, exceptionMessage);
// จัดเก็บเพื่อการวิเคราะห์และประมวลซ้ำในภายหลัง
FailedEvent failedEvent = FailedEvent.builder()
.eventId(record.value().eventId())
.eventType(record.value().eventType())
.payload(serializePayload(record.value()))
.originalTopic(originalTopic)
.originalPartition(originalPartition)
.originalOffset(originalOffset)
.exceptionMessage(exceptionMessage)
.stacktrace(stacktrace)
.status(FailedEventStatus.PENDING)
.createdAt(Instant.now())
.build();
failedEventRepository.save(failedEvent);
// แจ้งเตือนเพื่อการแทรกแซงโดยมนุษย์
alertingService.notifyDeadLetter(failedEvent);
acknowledgment.acknowledge();
}
}Spring Kafka จะเสริมข้อความ DLT โดยอัตโนมัติด้วยส่วนหัวที่บรรจุเมตาดาตาเกี่ยวกับความล้มเหลว ได้แก่ ข้อยกเว้น stacktrace topic เดิม partition และ offset ข้อมูลเหล่านี้ช่วยให้วินิจฉัยปัญหาได้สะดวก
การจัดการ idempotence ที่ฝั่งผู้บริโภค
Kafka รับประกันการส่งแบบ "at least once" หากเกิดข้อขัดข้องหลังการประมวลผลแต่ก่อนการ commit ข้อความอาจถูกส่งซ้ำได้ idempotence ที่ฝั่งผู้บริโภคจะช่วยป้องกันผลข้างเคียงจากการประมวลซ้ำ
@Service
@Slf4j
public class IdempotentOrderProcessor {
private final ProcessedEventRepository processedEventRepository;
private final OrderService orderService;
public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
OrderService orderService) {
this.processedEventRepository = processedEventRepository;
this.orderService = orderService;
}
@Transactional
public void processIdempotent(OrderEvent event) {
String eventId = event.eventId();
// ตรวจสอบ: เหตุการณ์นี้ถูกประมวลผลแล้วหรือยัง?
if (processedEventRepository.existsByEventId(eventId)) {
log.info("Event already processed, skipping: {}", eventId);
return;
}
// การประมวลผลทางธุรกิจ
switch (event.eventType()) {
case "ORDER_CREATED" -> orderService.createOrder(event.payload());
case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
default -> log.warn("Unknown event type: {}", event.eventType());
}
// บันทึกการประมวลผลในทรานแซกชันเดียวกัน
ProcessedEvent processed = ProcessedEvent.builder()
.eventId(eventId)
.eventType(event.eventType())
.processedAt(Instant.now())
.build();
processedEventRepository.save(processed);
log.info("Event processed and recorded: {}", eventId);
}
}@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {
// ดัชนีบน eventId เพื่อประสิทธิภาพสูงสุด
boolean existsByEventId(String eventId);
// ลบเรคคอร์ดเก่า
@Modifying
@Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
int deleteOlderThan(@Param("before") Instant before);
}การบันทึกการประมวลผลในทรานแซกชันเดียวกันกับลอจิกธุรกิจช่วยรับประกันความเป็นอะตอม หากเกิดความผิดพลาดหลังการประมวลผลแต่ก่อนการ commit ทั้งสองการดำเนินการจะถูกเล่นซ้ำอย่างสอดคล้องกัน
รูปแบบ Transactional Outbox
รูปแบบ Outbox ช่วยแก้ปัญหาความสอดคล้องระหว่างฐานข้อมูลและ Kafka แทนการเผยแพร่ตรงไปยัง Kafka เหตุการณ์จะถูกบันทึกลงในตาราง "outbox" ก่อน แล้วจึงถูกส่งต่อโดยกระบวนการเฉพาะ
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String eventId;
@Column(nullable = false)
private String aggregateType;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String eventType;
@Column(columnDefinition = "TEXT", nullable = false)
private String payload;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private OutboxStatus status;
@Column(nullable = false)
private Instant createdAt;
private Instant publishedAt;
}
public enum OutboxStatus {
PENDING, PUBLISHED, FAILED
}@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
public OrderService(OrderRepository orderRepository,
OutboxRepository outboxRepository,
ObjectMapper objectMapper) {
this.orderRepository = orderRepository;
this.outboxRepository = outboxRepository;
this.objectMapper = objectMapper;
}
public Order createOrder(CreateOrderRequest request) {
// สร้างคำสั่งซื้อ
Order order = Order.builder()
.customerId(request.customerId())
.items(request.items())
.totalAmount(calculateTotal(request.items()))
.status(OrderStatus.CREATED)
.createdAt(Instant.now())
.build();
Order savedOrder = orderRepository.save(order);
// สร้างเหตุการณ์ outbox ในทรานแซกชันเดียวกัน
OrderEvent event = OrderEvent.created(toPayload(savedOrder));
OutboxEvent outboxEvent = OutboxEvent.builder()
.eventId(event.eventId())
.aggregateType("Order")
.aggregateId(savedOrder.getId().toString())
.eventType(event.eventType())
.payload(serialize(event))
.status(OutboxStatus.PENDING)
.createdAt(Instant.now())
.build();
outboxRepository.save(outboxEvent);
return savedOrder;
}
private String serialize(OrderEvent event) {
try {
return objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
throw new SerializationException("Failed to serialize event", e);
}
}
}ตัวรีเลย์ outbox จะเผยแพร่เหตุการณ์ไปยัง Kafka แบบอะซิงโครนัส:
@Component
@Slf4j
public class OutboxRelayScheduler {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OutboxRelayScheduler(OutboxRepository outboxRepository,
KafkaTemplate<String, String> kafkaTemplate) {
this.outboxRepository = outboxRepository;
this.kafkaTemplate = kafkaTemplate;
}
@Scheduled(fixedDelay = 1000)
@Transactional
public void relayPendingEvents() {
// ดึงเหตุการณ์ที่ค้างอยู่พร้อมล็อก
List<OutboxEvent> pendingEvents = outboxRepository
.findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);
for (OutboxEvent event : pendingEvents) {
try {
// เผยแพร่ไปยัง Kafka
kafkaTemplate.send(
ordersTopic,
event.getAggregateId(),
event.getPayload()
).get(5, TimeUnit.SECONDS);
// อัปเดตสถานะ
event.setStatus(OutboxStatus.PUBLISHED);
event.setPublishedAt(Instant.now());
outboxRepository.save(event);
log.debug("Outbox event published: {}", event.getEventId());
} catch (Exception ex) {
log.error("Failed to publish outbox event: {}",
event.getEventId(), ex);
event.setStatus(OutboxStatus.FAILED);
outboxRepository.save(event);
}
}
}
}รูปแบบนี้รับประกันว่าเหตุการณ์จะถูกเผยแพร่ก็ต่อเมื่อทรานแซกชันทางธุรกิจสำเร็จ จึงขจัดความไม่สอดคล้องระหว่างฐานข้อมูลกับ Kafka
เริ่มฝึกซ้อมเลย!
ทดสอบความรู้ของคุณด้วยตัวจำลองสัมภาษณ์และแบบทดสอบเทคนิคครับ
การมอนิเตอร์และความสามารถในการสังเกตการณ์
การกำกับดูแลระบบ Kafka ต้องอาศัยตัววัดเรื่องดีเลย์ของการบริโภค lag และข้อผิดพลาด Spring Boot Actuator เปิดเผยตัววัดเหล่านี้ผ่าน Micrometer
@Configuration
public class KafkaMetricsConfig {
@Bean
public MeterBinder kafkaConsumerMetrics(
ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
return registry -> {
// ตัววัดแบบกำหนดเอง: เหตุการณ์ที่ประมวลผลแล้ว
Counter.builder("kafka.consumer.events.processed")
.description("Number of events successfully processed")
.tag("topic", "orders")
.register(registry);
// ตัววัดแบบกำหนดเอง: เหตุการณ์ที่ล้มเหลว
Counter.builder("kafka.consumer.events.failed")
.description("Number of events that failed processing")
.tag("topic", "orders")
.register(registry);
};
}
}@Service
@Slf4j
public class InstrumentedOrderConsumer {
private final OrderProcessingService processingService;
private final MeterRegistry meterRegistry;
private final Counter processedCounter;
private final Counter failedCounter;
private final Timer processingTimer;
public InstrumentedOrderConsumer(OrderProcessingService processingService,
MeterRegistry meterRegistry) {
this.processingService = processingService;
this.meterRegistry = meterRegistry;
// เริ่มต้นตัวนับ
this.processedCounter = Counter.builder("kafka.consumer.events.processed")
.tag("topic", "orders")
.register(meterRegistry);
this.failedCounter = Counter.builder("kafka.consumer.events.failed")
.tag("topic", "orders")
.register(meterRegistry);
// ตัวจับเวลาวัดดีเลย์ของการประมวลผล
this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
.tag("topic", "orders")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
}
@KafkaListener(topics = "${app.kafka.topics.orders}")
public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
processingService.process(event);
acknowledgment.acknowledge();
processedCounter.increment();
sample.stop(processingTimer);
} catch (Exception ex) {
failedCounter.increment();
sample.stop(processingTimer);
throw ex;
}
}
}ตัววัดเหล่านี้ช่วยให้ตั้งค่าการแจ้งเตือนเรื่อง lag ของผู้บริโภค อัตราข้อผิดพลาด และดีเลย์ของการประมวลผล ซึ่งเป็นสิ่งจำเป็นสำหรับการตรวจจับปัญหาเชิงรุก
สรุป
Spring Kafka มอบการผนวกรวมที่แข็งแกร่งสำหรับการสร้างสถาปัตยกรรม event-driven ที่ทนทาน การเชี่ยวชาญกลไกการลองใหม่ dead letter queue และ idempotence ถือเป็นรากฐานของแอปพลิเคชันที่พร้อมใช้งานในโปรดักชัน
เช็กลิสต์สถาปัตยกรรม event-driven ด้วย Spring Kafka:
- ✅ ตั้งค่าการ commit offset แบบแมนนวลด้วย
AckMode.MANUAL - ✅ ใช้คีย์ของ partition ที่สอดคล้องกันเพื่อรักษาลำดับเหตุการณ์
- ✅ ปรับใช้การลองใหม่พร้อม backoff แบบเอกซ์โพเนนเชียลผ่าน
DefaultErrorHandler - ✅ ส่งข้อความที่ล้มเหลวไปยัง Dead Letter Queue
- ✅ รับประกัน idempotence ที่ฝั่งผู้บริโภคด้วยการติดตาม eventId
- ✅ พิจารณารูปแบบ Outbox เพื่อความสอดคล้องระหว่างฐานข้อมูลกับ Kafka
- ✅ เปิดเผยตัววัดผ่าน Micrometer เพื่อการมอนิเตอร์
- ✅ ปรับ
MAX_POLL_INTERVAL_MSให้สอดคล้องกับเวลาประมวลผลสูงสุด
แท็ก
แชร์
บทความที่เกี่ยวข้อง

สัมภาษณ์ Spring Cloud Gateway: Routing, Filter และ Load Balancing
เชี่ยวชาญ Spring Cloud Gateway สำหรับการสัมภาษณ์เทคนิค: 12 คำถามครอบคลุม routing, filter, load balancing และ pattern API Gateway พร้อมตัวอย่างโค้ด.

Spring Boot Logging ในปี 2026: ล็อกแบบมีโครงสร้างสำหรับโปรดักชันด้วย Logback และ JSON
คู่มือฉบับสมบูรณ์สำหรับ structured logging ใน Spring Boot การตั้งค่า Logback JSON, MDC สำหรับ tracing แนวปฏิบัติที่ดีที่สุดในโปรดักชัน และการรวมกับ ELK Stack

สัมภาษณ์ Spring GraphQL: Resolver, DataLoader และวิธีแก้ปัญหา N+1
เตรียมตัวสำหรับการสัมภาษณ์ Spring GraphQL ด้วยคู่มือที่ครบถ้วนนี้ Resolver, DataLoader, การจัดการปัญหา N+1, mutation และแนวปฏิบัติที่ดีที่สุดสำหรับคำถามทางเทคนิค