Spring Kafka: สถาปัตยกรรม event-driven พร้อม consumer ที่ทนทาน

คู่มือ Spring Kafka แบบครบถ้วนสำหรับสถาปัตยกรรม event-driven การตั้งค่า consumer ที่ทนทาน นโยบาย retry dead letter queue และรูปแบบโปรดักชันสำหรับแอปพลิเคชันแบบกระจาย

สถาปัตยกรรม event-driven ด้วย Spring Kafka และ consumer ที่ทนทาน

Apache Kafka กลายเป็นมาตรฐานโดยพฤตินัยสำหรับสถาปัตยกรรม event-driven ขนาดใหญ่ Spring Kafka ช่วยให้การผนวกรวมเข้ากับแอปพลิเคชัน Spring Boot ง่ายขึ้น พร้อมทั้งมอบกลไกความทนทานที่จำเป็นสำหรับสภาพแวดล้อมโปรดักชัน คู่มือนี้ลงลึกในเรื่องการกำหนดค่า รูปแบบการบริโภค และกลยุทธ์การจัดการข้อผิดพลาด

ข้อกำหนดเบื้องต้น

คู่มือนี้สมมุติว่าคุ้นเคยกับแนวคิดพื้นฐานของ Kafka แล้ว ได้แก่ topic, partition, consumer group และ offset โดยเน้นไปที่การผนวกรวมกับ Spring และรูปแบบความทนทาน

ทำไมจึงเลือกสถาปัตยกรรม event-driven?

สถาปัตยกรรม event-driven จะแยกองค์ประกอบของระบบออกจากกันด้วยเหตุการณ์แบบอะซิงโครนัส ต่างจากการเรียก REST แบบซิงโครนัส โปรดิวเซอร์จะปล่อยเหตุการณ์โดยไม่รอการตอบกลับ ทำให้ผู้บริโภคประมวลผลได้ตามจังหวะของตนเอง

แนวทางนี้มอบประโยชน์สำคัญหลายประการ ได้แก่ ความสามารถในการขยายตามแนวนอนแบบอิสระต่อบริการ ความทนทานที่สูงขึ้นต่อความล้มเหลวชั่วคราว และความสามารถในการตรวจสอบย้อนหลังครบถ้วนผ่านล็อกที่ไม่สามารถแก้ไขได้ของ Kafka

OrderEvent.javajava
public record OrderEvent(
    // ตัวระบุเหตุการณ์เฉพาะสำหรับ idempotence
    String eventId,
    // ประเภทของเหตุการณ์สำหรับการกำหนดเส้นทาง
    String eventType,
    // เวลาที่สร้าง
    Instant createdAt,
    // payload เชิงธุรกิจ
    OrderPayload payload
) {
    // เมธอด factory ที่รับประกันความเฉพาะตัวของ eventId
    public static OrderEvent created(OrderPayload payload) {
        return new OrderEvent(
            UUID.randomUUID().toString(),
            "ORDER_CREATED",
            Instant.now(),
            payload
        );
    }
}

public record OrderPayload(
    Long orderId,
    Long customerId,
    List<OrderItem> items,
    BigDecimal totalAmount
) {}

โครงสร้างของเหตุการณ์มีตัวระบุเฉพาะ ประเภท และเวลาเสมอ เมตาดาตาเหล่านี้ช่วยให้ทำการกรองที่ฝั่งผู้บริโภค ตรวจจับซ้ำซ้อน และติดตามตามลำดับเวลาได้

การกำหนดค่าพื้นฐานของ Spring Kafka

การผนวกรวมเริ่มต้นจาก starter spring-kafka และการกำหนดค่า YAML ขั้นต่ำ Spring Boot จะกำหนดค่า bean สำคัญให้อัตโนมัติ ได้แก่ KafkaTemplate สำหรับการส่ง และ ConcurrentKafkaListenerContainerFactory สำหรับการบริโภค

yaml
# application.yml
spring:
  kafka:
    # ที่อยู่โบรกเกอร์ Kafka (คลัสเตอร์)
    bootstrap-servers: localhost:9092

    # การกำหนดค่าโปรดิวเซอร์
    producer:
      # serialization คีย์แบบ String
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # serialization ค่าเป็น JSON
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # รอการตอบรับจากเรพลิกาทั้งหมด
      acks: all
      # จำนวนครั้งที่ลองใหม่เมื่อเครือข่ายล้มเหลว
      retries: 3

    # การกำหนดค่าผู้บริโภค
    consumer:
      # ตัวระบุของ consumer group
      group-id: order-service
      # deserialization ของคีย์
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # deserialization JSON พร้อมประเภทเป้าหมาย
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # ตำแหน่งเริ่มต้นเมื่อยังไม่มี offset บันทึก
      auto-offset-reset: earliest
      # ปิด auto-commit เพื่อควบคุมเอง
      enable-auto-commit: false
      properties:
        # แพ็กเกจที่เชื่อถือได้สำหรับ deserialization
        spring.json.trusted.packages: com.example.events

การปิด enable-auto-commit เป็นแนวปฏิบัติที่จำเป็นในโปรดักชัน การ commit offset แบบแมนนวลทำให้ข้อความจะถูกทำเครื่องหมายว่าประมวลผลแล้วก็ต่อเมื่อการประมวลผลเสร็จสิ้นจริง

การสร้างโปรดิวเซอร์ Kafka

KafkaTemplate ห่อหุ้มลอจิกการส่งไปยัง Kafka การ inject ตรงทำให้สามารถใช้งานได้ทันทีภายในบริการเชิงธุรกิจ

OrderEventPublisher.javajava
@Service
@Slf4j
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    // topic ปลายทาง (ดึงจากภายนอกการกำหนดค่า)
    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
            OrderPayload payload) {

        // สร้างเหตุการณ์พร้อมเมตาดาตา
        OrderEvent event = OrderEvent.created(payload);

        // ใช้ orderId เป็นคีย์ของ partition
        // รับประกันลำดับของเหตุการณ์สำหรับคำสั่งซื้อเดียวกัน
        String partitionKey = payload.orderId().toString();

        log.info("Publishing ORDER_CREATED event: {} to topic: {}",
            event.eventId(), ordersTopic);

        // ส่งแบบอะซิงโครนัสพร้อม callback
        return kafkaTemplate.send(ordersTopic, partitionKey, event)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    // สำเร็จ: ล็อกเมตาดาตาของการส่ง
                    RecordMetadata metadata = result.getRecordMetadata();
                    log.info("Event sent successfully: topic={}, partition={}, offset={}",
                        metadata.topic(), metadata.partition(), metadata.offset());
                } else {
                    // ล้มเหลว: ล็อกข้อผิดพลาดเพื่อตรวจสอบ
                    log.error("Failed to send event: {}", event.eventId(), ex);
                }
            });
    }
}

การใช้คีย์ของ partition ที่อิงจากตัวระบุทางธุรกิจทำให้เหตุการณ์ทั้งหมดของ entity เดียวกันลงใน partition เดียวกัน จึงรักษาลำดับตามเวลาเอาไว้

คีย์ของ partition

คีย์ที่เป็น null จะกระจายข้อความแบบ round-robin ระหว่าง partition ซึ่งช่วยเพิ่มการประมวลผลแบบขนานสูงสุด แต่จะสูญเสียการรับประกันลำดับ การเลือกคีย์ขึ้นอยู่กับความต้องการทางธุรกิจ

ผู้บริโภคพื้นฐานด้วย @KafkaListener

แอนโนเตชัน @KafkaListener แปลงเมธอดให้กลายเป็นผู้บริโภค Kafka Spring จะจัดการลูป polling, deserialization และการ commit offset ให้อัตโนมัติ

OrderEventConsumer.javajava
@Service
@Slf4j
public class OrderEventConsumer {

    private final OrderProcessingService processingService;

    public OrderEventConsumer(OrderProcessingService processingService) {
        this.processingService = processingService;
    }

    @KafkaListener(
        // topic ที่ต้องฟัง
        topics = "${app.kafka.topics.orders}",
        // consumer group
        groupId = "${spring.kafka.consumer.group-id}",
        // factory แบบกำหนดเองสำหรับการกำหนดค่าขั้นสูง
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            // payload ที่ deserialize อัตโนมัติ
            OrderEvent event,
            // เมตาดาตา Kafka ที่ถูก inject
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            // Acknowledgment สำหรับการ commit แบบแมนนวล
            Acknowledgment acknowledgment) {

        log.info("Received event: type={}, partition={}, offset={}",
            event.eventType(), partition, offset);

        try {
            // การประมวลผลทางธุรกิจ
            processingService.process(event);

            // commit offset เฉพาะเมื่อประมวลผลสำเร็จ
            acknowledgment.acknowledge();

            log.info("Event processed successfully: {}", event.eventId());

        } catch (Exception ex) {
            // หากไม่มีการเรียก acknowledge() จะทำให้เกิดการประมวลซ้ำ
            log.error("Failed to process event: {}", event.eventId(), ex);
            throw ex;
        }
    }
}

การ inject Acknowledgment ช่วยให้ควบคุมการ commit ได้อย่างชัดเจน หากไม่มีการเรียก acknowledge() offset จะยังไม่ถูก commit และข้อความจะถูกส่งซ้ำในการ poll ครั้งถัดไป

การกำหนดค่า ConsumerFactory ขั้นสูง

การกำหนดค่าเริ่มต้นเหมาะกับการพัฒนา แต่ต้องปรับเปลี่ยนเมื่อใช้งานจริง factory แบบกำหนดเองให้การควบคุมพฤติกรรมของผู้บริโภคได้อย่างละเอียด

KafkaConsumerConfig.javajava
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        // การกำหนดค่าการเชื่อมต่อ
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // deserialization
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);

        // การจัดการ offset
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // ประสิทธิภาพ: จำนวนเรคคอร์ดต่อหนึ่ง poll
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        // session timeout เพื่อตรวจจับความล้มเหลว
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        // ช่วงเวลา heartbeat (1/3 ของ session timeout)
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

        // เวลาประมวลผลสูงสุดก่อน rebalance
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

        // การกำหนดค่าของ JSON deserializer
        JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
        deserializer.addTrustedPackages("com.example.events");
        deserializer.setUseTypeMapperForKey(false);

        return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        // โหมด acknowledgment แบบแมนนวล
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // จำนวนเทรดของผู้บริโภค
        factory.setConcurrency(3);

        // ปิดการประมวลผลแบบแบตช์ (ครั้งละหนึ่งข้อความ)
        factory.setBatchListener(false);

        return factory;
    }
}

พารามิเตอร์ MAX_POLL_INTERVAL_MS_CONFIG กำหนดดีเลย์สูงสุดระหว่างการเรียก poll() หากเกินค่านี้ ผู้บริโภคจะถูกขับออกจากกลุ่มและเกิด rebalance ค่านี้ควรสะท้อนเวลาประมวลผลสูงสุดที่คาดไว้

พร้อมที่จะพิชิตการสัมภาษณ์ Spring Boot แล้วหรือยังครับ?

ฝึกฝนด้วยตัวจำลองแบบโต้ตอบ, flashcards และแบบทดสอบเทคนิคครับ

กลยุทธ์การลองใหม่ด้วย RetryTemplate

ข้อผิดพลาดชั่วคราว (บริการขาดหายไประยะสั้น เครือข่ายหมดเวลา) ต้องการการลองใหม่อัตโนมัติ Spring Kafka ผนวกกับ RetryTemplate เพื่อรองรับนโยบายการลองใหม่ขั้นสูง

KafkaRetryConfig.javajava
@Configuration
public class KafkaRetryConfig {

    @Bean
    public RetryTemplate kafkaRetryTemplate() {
        RetryTemplate template = new RetryTemplate();

        // นโยบายลองใหม่: สูงสุด 3 ครั้ง
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        template.setRetryPolicy(retryPolicy);

        // backoff แบบเอกซ์โพเนนเชียล: 1 วินาที, 2 วินาที, 4 วินาที ระหว่างการลองใหม่
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        template.setBackOffPolicy(backOffPolicy);

        return template;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            retryableKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                RetryTemplate kafkaRetryTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // การกำหนดค่าการลองใหม่พร้อม recovery callback
        factory.setCommonErrorHandler(
            new DefaultErrorHandler(
                // recovery: การกระทำเมื่อความพยายามหมด
                (record, exception) -> {
                    log.error("All retries exhausted for record: key={}, value={}",
                        record.key(), record.value(), exception);
                },
                // backoff แบบเอกซ์โพเนนเชียล: เริ่ม 1 วินาที สูงสุด 30 วินาที 3 ครั้ง
                new ExponentialBackOff(1000L, 2.0)
            )
        );

        return factory;
    }
}

DefaultErrorHandler มาแทนที่ SeekToCurrentErrorHandler เดิมตั้งแต่ Spring Kafka 2.8 ทำให้ API ชัดเจนขึ้นและเปิดทางเลือกในการกำหนดค่าได้กว้างขึ้น

การใช้งาน Dead Letter Queue

หลังจากการลองใหม่หมดลง ข้อความที่ล้มเหลวควรถูกส่งต่อไปยัง Dead Letter Queue (DLQ) เพื่อนำไปวิเคราะห์ภายหลัง วิธีนี้ช่วยป้องกันการสูญหายของข้อมูลและคลายการอุดตันของผู้บริโภค

DeadLetterConfig.javajava
@Configuration
public class DeadLetterConfig {

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // กลยุทธ์การตั้งชื่อ topic DLT: original-topic.DLT
        return new DeadLetterPublishingRecoverer(
            kafkaTemplate,
            (record, exception) -> {
                // topic DLT อิงจาก topic ต้นทาง
                String dltTopic = record.topic() + ".DLT";

                log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
                    dltTopic, record.key(), exception.getMessage());

                return new TopicPartition(dltTopic, record.partition());
            }
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            dltKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // ErrorHandler พร้อม recovery แบบ DLT
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            deadLetterPublishingRecoverer,
            new FixedBackOff(1000L, 3)
        );

        // ข้อยกเว้นที่ไม่ควรลองใหม่ (ส่งตรงไปยัง DLT)
        errorHandler.addNotRetryableExceptions(
            ValidationException.class,
            JsonParseException.class,
            NullPointerException.class
        );

        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }
}

การแยกแยะข้อยกเว้นที่ลองใหม่ได้และไม่ควรลองใหม่จะช่วยปรับพฤติกรรมให้เหมาะสม ValidationException บ่งบอกถึงข้อมูลที่ผิดรูปแบบซึ่งการลองใหม่ก็ไม่อาจแก้ไขได้ การส่งตรงไป DLT จึงสมเหตุสมผล

ผู้บริโภค DLT สำหรับการประมวลซ้ำแบบแมนนวล

ผู้บริโภคเฉพาะทางจะคอยตรวจสอบ DLT และเปิดให้ประมวลซ้ำข้อความหลังจากแก้ไขสาเหตุที่แท้จริงแล้ว

DeadLetterConsumer.javajava
@Service
@Slf4j
public class DeadLetterConsumer {

    private final AlertingService alertingService;
    private final FailedEventRepository failedEventRepository;

    public DeadLetterConsumer(AlertingService alertingService,
                              FailedEventRepository failedEventRepository) {
        this.alertingService = alertingService;
        this.failedEventRepository = failedEventRepository;
    }

    @KafkaListener(
        topics = "${app.kafka.topics.orders}.DLT",
        groupId = "order-service-dlt"
    )
    public void handleDeadLetter(
            ConsumerRecord<String, OrderEvent> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
            @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
            @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
            Acknowledgment acknowledgment) {

        log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
            originalTopic, originalPartition, originalOffset, exceptionMessage);

        // จัดเก็บเพื่อการวิเคราะห์และประมวลซ้ำในภายหลัง
        FailedEvent failedEvent = FailedEvent.builder()
            .eventId(record.value().eventId())
            .eventType(record.value().eventType())
            .payload(serializePayload(record.value()))
            .originalTopic(originalTopic)
            .originalPartition(originalPartition)
            .originalOffset(originalOffset)
            .exceptionMessage(exceptionMessage)
            .stacktrace(stacktrace)
            .status(FailedEventStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        failedEventRepository.save(failedEvent);

        // แจ้งเตือนเพื่อการแทรกแซงโดยมนุษย์
        alertingService.notifyDeadLetter(failedEvent);

        acknowledgment.acknowledge();
    }
}
ส่วนหัวของ DLT

Spring Kafka จะเสริมข้อความ DLT โดยอัตโนมัติด้วยส่วนหัวที่บรรจุเมตาดาตาเกี่ยวกับความล้มเหลว ได้แก่ ข้อยกเว้น stacktrace topic เดิม partition และ offset ข้อมูลเหล่านี้ช่วยให้วินิจฉัยปัญหาได้สะดวก

การจัดการ idempotence ที่ฝั่งผู้บริโภค

Kafka รับประกันการส่งแบบ "at least once" หากเกิดข้อขัดข้องหลังการประมวลผลแต่ก่อนการ commit ข้อความอาจถูกส่งซ้ำได้ idempotence ที่ฝั่งผู้บริโภคจะช่วยป้องกันผลข้างเคียงจากการประมวลซ้ำ

IdempotentOrderProcessor.javajava
@Service
@Slf4j
public class IdempotentOrderProcessor {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderService orderService;

    public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
                                    OrderService orderService) {
        this.processedEventRepository = processedEventRepository;
        this.orderService = orderService;
    }

    @Transactional
    public void processIdempotent(OrderEvent event) {
        String eventId = event.eventId();

        // ตรวจสอบ: เหตุการณ์นี้ถูกประมวลผลแล้วหรือยัง?
        if (processedEventRepository.existsByEventId(eventId)) {
            log.info("Event already processed, skipping: {}", eventId);
            return;
        }

        // การประมวลผลทางธุรกิจ
        switch (event.eventType()) {
            case "ORDER_CREATED" -> orderService.createOrder(event.payload());
            case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
            case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
            default -> log.warn("Unknown event type: {}", event.eventType());
        }

        // บันทึกการประมวลผลในทรานแซกชันเดียวกัน
        ProcessedEvent processed = ProcessedEvent.builder()
            .eventId(eventId)
            .eventType(event.eventType())
            .processedAt(Instant.now())
            .build();

        processedEventRepository.save(processed);

        log.info("Event processed and recorded: {}", eventId);
    }
}
ProcessedEventRepository.javajava
@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {

    // ดัชนีบน eventId เพื่อประสิทธิภาพสูงสุด
    boolean existsByEventId(String eventId);

    // ลบเรคคอร์ดเก่า
    @Modifying
    @Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
    int deleteOlderThan(@Param("before") Instant before);
}

การบันทึกการประมวลผลในทรานแซกชันเดียวกันกับลอจิกธุรกิจช่วยรับประกันความเป็นอะตอม หากเกิดความผิดพลาดหลังการประมวลผลแต่ก่อนการ commit ทั้งสองการดำเนินการจะถูกเล่นซ้ำอย่างสอดคล้องกัน

รูปแบบ Transactional Outbox

รูปแบบ Outbox ช่วยแก้ปัญหาความสอดคล้องระหว่างฐานข้อมูลและ Kafka แทนการเผยแพร่ตรงไปยัง Kafka เหตุการณ์จะถูกบันทึกลงในตาราง "outbox" ก่อน แล้วจึงถูกส่งต่อโดยกระบวนการเฉพาะ

OutboxEvent.javajava
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String eventId;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String eventType;

    @Column(columnDefinition = "TEXT", nullable = false)
    private String payload;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private OutboxStatus status;

    @Column(nullable = false)
    private Instant createdAt;

    private Instant publishedAt;
}

public enum OutboxStatus {
    PENDING, PUBLISHED, FAILED
}
OrderService.javajava
@Service
@Transactional
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    public OrderService(OrderRepository orderRepository,
                       OutboxRepository outboxRepository,
                       ObjectMapper objectMapper) {
        this.orderRepository = orderRepository;
        this.outboxRepository = outboxRepository;
        this.objectMapper = objectMapper;
    }

    public Order createOrder(CreateOrderRequest request) {
        // สร้างคำสั่งซื้อ
        Order order = Order.builder()
            .customerId(request.customerId())
            .items(request.items())
            .totalAmount(calculateTotal(request.items()))
            .status(OrderStatus.CREATED)
            .createdAt(Instant.now())
            .build();

        Order savedOrder = orderRepository.save(order);

        // สร้างเหตุการณ์ outbox ในทรานแซกชันเดียวกัน
        OrderEvent event = OrderEvent.created(toPayload(savedOrder));

        OutboxEvent outboxEvent = OutboxEvent.builder()
            .eventId(event.eventId())
            .aggregateType("Order")
            .aggregateId(savedOrder.getId().toString())
            .eventType(event.eventType())
            .payload(serialize(event))
            .status(OutboxStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        outboxRepository.save(outboxEvent);

        return savedOrder;
    }

    private String serialize(OrderEvent event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Failed to serialize event", e);
        }
    }
}

ตัวรีเลย์ outbox จะเผยแพร่เหตุการณ์ไปยัง Kafka แบบอะซิงโครนัส:

OutboxRelayScheduler.javajava
@Component
@Slf4j
public class OutboxRelayScheduler {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OutboxRelayScheduler(OutboxRepository outboxRepository,
                               KafkaTemplate<String, String> kafkaTemplate) {
        this.outboxRepository = outboxRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void relayPendingEvents() {
        // ดึงเหตุการณ์ที่ค้างอยู่พร้อมล็อก
        List<OutboxEvent> pendingEvents = outboxRepository
            .findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);

        for (OutboxEvent event : pendingEvents) {
            try {
                // เผยแพร่ไปยัง Kafka
                kafkaTemplate.send(
                    ordersTopic,
                    event.getAggregateId(),
                    event.getPayload()
                ).get(5, TimeUnit.SECONDS);

                // อัปเดตสถานะ
                event.setStatus(OutboxStatus.PUBLISHED);
                event.setPublishedAt(Instant.now());
                outboxRepository.save(event);

                log.debug("Outbox event published: {}", event.getEventId());

            } catch (Exception ex) {
                log.error("Failed to publish outbox event: {}",
                    event.getEventId(), ex);
                event.setStatus(OutboxStatus.FAILED);
                outboxRepository.save(event);
            }
        }
    }
}

รูปแบบนี้รับประกันว่าเหตุการณ์จะถูกเผยแพร่ก็ต่อเมื่อทรานแซกชันทางธุรกิจสำเร็จ จึงขจัดความไม่สอดคล้องระหว่างฐานข้อมูลกับ Kafka

เริ่มฝึกซ้อมเลย!

ทดสอบความรู้ของคุณด้วยตัวจำลองสัมภาษณ์และแบบทดสอบเทคนิคครับ

การมอนิเตอร์และความสามารถในการสังเกตการณ์

การกำกับดูแลระบบ Kafka ต้องอาศัยตัววัดเรื่องดีเลย์ของการบริโภค lag และข้อผิดพลาด Spring Boot Actuator เปิดเผยตัววัดเหล่านี้ผ่าน Micrometer

KafkaMetricsConfig.javajava
@Configuration
public class KafkaMetricsConfig {

    @Bean
    public MeterBinder kafkaConsumerMetrics(
            ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        return registry -> {
            // ตัววัดแบบกำหนดเอง: เหตุการณ์ที่ประมวลผลแล้ว
            Counter.builder("kafka.consumer.events.processed")
                .description("Number of events successfully processed")
                .tag("topic", "orders")
                .register(registry);

            // ตัววัดแบบกำหนดเอง: เหตุการณ์ที่ล้มเหลว
            Counter.builder("kafka.consumer.events.failed")
                .description("Number of events that failed processing")
                .tag("topic", "orders")
                .register(registry);
        };
    }
}
InstrumentedOrderConsumer.javajava
@Service
@Slf4j
public class InstrumentedOrderConsumer {

    private final OrderProcessingService processingService;
    private final MeterRegistry meterRegistry;
    private final Counter processedCounter;
    private final Counter failedCounter;
    private final Timer processingTimer;

    public InstrumentedOrderConsumer(OrderProcessingService processingService,
                                     MeterRegistry meterRegistry) {
        this.processingService = processingService;
        this.meterRegistry = meterRegistry;

        // เริ่มต้นตัวนับ
        this.processedCounter = Counter.builder("kafka.consumer.events.processed")
            .tag("topic", "orders")
            .register(meterRegistry);

        this.failedCounter = Counter.builder("kafka.consumer.events.failed")
            .tag("topic", "orders")
            .register(meterRegistry);

        // ตัวจับเวลาวัดดีเลย์ของการประมวลผล
        this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
            .tag("topic", "orders")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(meterRegistry);
    }

    @KafkaListener(topics = "${app.kafka.topics.orders}")
    public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {

        Timer.Sample sample = Timer.start(meterRegistry);

        try {
            processingService.process(event);
            acknowledgment.acknowledge();

            processedCounter.increment();
            sample.stop(processingTimer);

        } catch (Exception ex) {
            failedCounter.increment();
            sample.stop(processingTimer);
            throw ex;
        }
    }
}

ตัววัดเหล่านี้ช่วยให้ตั้งค่าการแจ้งเตือนเรื่อง lag ของผู้บริโภค อัตราข้อผิดพลาด และดีเลย์ของการประมวลผล ซึ่งเป็นสิ่งจำเป็นสำหรับการตรวจจับปัญหาเชิงรุก

สรุป

Spring Kafka มอบการผนวกรวมที่แข็งแกร่งสำหรับการสร้างสถาปัตยกรรม event-driven ที่ทนทาน การเชี่ยวชาญกลไกการลองใหม่ dead letter queue และ idempotence ถือเป็นรากฐานของแอปพลิเคชันที่พร้อมใช้งานในโปรดักชัน

เช็กลิสต์สถาปัตยกรรม event-driven ด้วย Spring Kafka:

  • ✅ ตั้งค่าการ commit offset แบบแมนนวลด้วย AckMode.MANUAL
  • ✅ ใช้คีย์ของ partition ที่สอดคล้องกันเพื่อรักษาลำดับเหตุการณ์
  • ✅ ปรับใช้การลองใหม่พร้อม backoff แบบเอกซ์โพเนนเชียลผ่าน DefaultErrorHandler
  • ✅ ส่งข้อความที่ล้มเหลวไปยัง Dead Letter Queue
  • ✅ รับประกัน idempotence ที่ฝั่งผู้บริโภคด้วยการติดตาม eventId
  • ✅ พิจารณารูปแบบ Outbox เพื่อความสอดคล้องระหว่างฐานข้อมูลกับ Kafka
  • ✅ เปิดเผยตัววัดผ่าน Micrometer เพื่อการมอนิเตอร์
  • ✅ ปรับ MAX_POLL_INTERVAL_MS ให้สอดคล้องกับเวลาประมวลผลสูงสุด

แท็ก

#spring kafka
#event-driven
#kafka consumer
#microservices
#resilience

แชร์

บทความที่เกี่ยวข้อง