Spring Kafka: arsitektur event-driven dengan consumer yang resilien

Panduan lengkap Spring Kafka untuk arsitektur event-driven. Konfigurasi, consumer resilien, kebijakan retry, dead letter queue, dan pola produksi untuk aplikasi terdistribusi.

Arsitektur event-driven dengan Spring Kafka dan consumer resilien

Apache Kafka telah menjadi standar de facto untuk arsitektur event-driven berskala besar. Spring Kafka menyederhanakan integrasinya ke dalam aplikasi Spring Boot sekaligus menyediakan mekanisme resiliensi yang esensial untuk lingkungan produksi. Panduan ini membahas secara mendalam konfigurasi, pola konsumsi, dan strategi penanganan kesalahan.

Prasyarat

Panduan ini mengasumsikan keakraban dengan konsep dasar Kafka: topic, partition, consumer group, dan offset. Fokusnya adalah integrasi Spring serta pola resiliensi.

Mengapa memilih arsitektur event-driven?

Arsitektur event-driven memisahkan komponen sistem melalui peristiwa asinkron. Berbeda dengan panggilan REST sinkron, producer mengirim peristiwa tanpa menunggu balasan, sehingga consumer dapat memprosesnya dengan kecepatannya sendiri.

Pendekatan ini memberikan beberapa keuntungan penting: skalabilitas horizontal mandiri per layanan, resiliensi yang lebih tinggi terhadap kegagalan sementara, serta keterlacakan menyeluruh melalui log Kafka yang tidak dapat diubah.

OrderEvent.javajava
public record OrderEvent(
    // Identifier unik peristiwa untuk idempotensi
    String eventId,
    // Jenis peristiwa untuk routing
    String eventType,
    // Stempel waktu pembuatan
    Instant createdAt,
    // Payload bisnis
    OrderPayload payload
) {
    // Factory method yang menjamin keunikan eventId
    public static OrderEvent created(OrderPayload payload) {
        return new OrderEvent(
            UUID.randomUUID().toString(),
            "ORDER_CREATED",
            Instant.now(),
            payload
        );
    }
}

public record OrderPayload(
    Long orderId,
    Long customerId,
    List<OrderItem> items,
    BigDecimal totalAmount
) {}

Struktur peristiwa secara sistematis menyertakan identifier unik, tipe, dan stempel waktu. Metadata ini memungkinkan penyaringan di sisi consumer, pendeteksian duplikat, dan pelacakan temporal.

Konfigurasi dasar Spring Kafka

Integrasi dimulai dengan starter spring-kafka dan konfigurasi YAML minimal. Spring Boot mengonfigurasi bean-bean penting secara otomatis: KafkaTemplate untuk produksi dan ConcurrentKafkaListenerContainerFactory untuk konsumsi.

yaml
# application.yml
spring:
  kafka:
    # Alamat broker Kafka (cluster)
    bootstrap-servers: localhost:9092

    # Konfigurasi producer
    producer:
      # Serialisasi key bertipe String
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Serialisasi value JSON
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # Menunggu konfirmasi seluruh replika
      acks: all
      # Jumlah percobaan ulang saat gagal jaringan
      retries: 3

    # Konfigurasi consumer
    consumer:
      # Identifier consumer group
      group-id: order-service
      # Deserialisasi key
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Deserialisasi JSON dengan tipe target
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # Posisi awal jika belum ada offset tercatat
      auto-offset-reset: earliest
      # Nonaktifkan auto-commit untuk kontrol manual
      enable-auto-commit: false
      properties:
        # Paket terpercaya untuk deserialisasi
        spring.json.trusted.packages: com.example.events

Menonaktifkan enable-auto-commit adalah praktik produksi yang esensial. Commit offset manual menjamin pesan hanya ditandai sebagai diproses setelah pemrosesan benar-benar selesai.

Membuat producer Kafka

KafkaTemplate mengenkapsulasi logika pengiriman ke Kafka. Injeksi langsungnya memungkinkan penggunaan instan di dalam layanan bisnis.

OrderEventPublisher.javajava
@Service
@Slf4j
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    // Topic tujuan (dieksternalkan dalam konfigurasi)
    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
            OrderPayload payload) {

        // Pembuatan peristiwa beserta metadatanya
        OrderEvent event = OrderEvent.created(payload);

        // Penggunaan orderId sebagai partition key
        // Menjamin urutan peristiwa untuk pesanan yang sama
        String partitionKey = payload.orderId().toString();

        log.info("Publishing ORDER_CREATED event: {} to topic: {}",
            event.eventId(), ordersTopic);

        // Pengiriman asinkron dengan callback
        return kafkaTemplate.send(ordersTopic, partitionKey, event)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    // Sukses: log metadata pengiriman
                    RecordMetadata metadata = result.getRecordMetadata();
                    log.info("Event sent successfully: topic={}, partition={}, offset={}",
                        metadata.topic(), metadata.partition(), metadata.offset());
                } else {
                    // Gagal: log kesalahan untuk investigasi
                    log.error("Failed to send event: {}", event.eventId(), ex);
                }
            });
    }
}

Penggunaan partition key berbasis identifier bisnis memastikan seluruh peristiwa dari entitas yang sama jatuh ke partition yang sama, sehingga urutan kronologisnya tetap terjaga.

Partition key

Key bernilai null mendistribusikan pesan secara round-robin ke seluruh partition. Cara ini memaksimalkan paralelisme tetapi kehilangan jaminan urutan. Pemilihan key bergantung pada kebutuhan bisnis.

Consumer dasar dengan @KafkaListener

Anotasi @KafkaListener mengubah sebuah metode menjadi consumer Kafka. Spring secara otomatis mengelola loop polling, deserialisasi, dan commit offset.

OrderEventConsumer.javajava
@Service
@Slf4j
public class OrderEventConsumer {

    private final OrderProcessingService processingService;

    public OrderEventConsumer(OrderProcessingService processingService) {
        this.processingService = processingService;
    }

    @KafkaListener(
        // Topic yang didengarkan
        topics = "${app.kafka.topics.orders}",
        // Consumer group
        groupId = "${spring.kafka.consumer.group-id}",
        // Factory kustom untuk konfigurasi lanjutan
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            // Payload yang dideserialisasi otomatis
            OrderEvent event,
            // Metadata Kafka yang disuntikkan
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            // Acknowledgment untuk commit manual
            Acknowledgment acknowledgment) {

        log.info("Received event: type={}, partition={}, offset={}",
            event.eventType(), partition, offset);

        try {
            // Pemrosesan bisnis
            processingService.process(event);

            // Commit offset hanya setelah pemrosesan berhasil
            acknowledgment.acknowledge();

            log.info("Event processed successfully: {}", event.eventId());

        } catch (Exception ex) {
            // Tidak memanggil acknowledge() menyebabkan pemrosesan ulang
            log.error("Failed to process event: {}", event.eventId(), ex);
            throw ex;
        }
    }
}

Menyuntikkan Acknowledgment memungkinkan kontrol commit yang eksplisit. Tanpa pemanggilan acknowledge(), offset tetap belum di-commit dan pesan akan dikirim ulang pada poll berikutnya.

Konfigurasi ConsumerFactory tingkat lanjut

Konfigurasi default sudah memadai untuk pengembangan tetapi memerlukan penyesuaian untuk produksi. Factory kustom memberikan kendali halus atas perilaku consumer.

KafkaConsumerConfig.javajava
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        // Konfigurasi koneksi
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // Deserialisasi
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);

        // Pengelolaan offset
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // Performa: jumlah record per poll
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        // Timeout sesi untuk deteksi kegagalan
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        // Interval heartbeat (1/3 dari timeout sesi)
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

        // Waktu maksimum pemrosesan sebelum rebalance
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

        // Konfigurasi deserializer JSON
        JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
        deserializer.addTrustedPackages("com.example.events");
        deserializer.setUseTypeMapperForKey(false);

        return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        // Mode acknowledgment manual
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Jumlah thread consumer
        factory.setConcurrency(3);

        // Pemrosesan batch dimatikan (satu pesan setiap kali)
        factory.setBatchListener(false);

        return factory;
    }
}

Parameter MAX_POLL_INTERVAL_MS_CONFIG menentukan jeda maksimum antara pemanggilan poll(). Jika dilampaui, consumer akan dikeluarkan dari grup dan terjadi rebalance. Nilai ini harus mencerminkan waktu pemrosesan maksimum yang diharapkan.

Siap menguasai wawancara Spring Boot Anda?

Berlatih dengan simulator interaktif, flashcards, dan tes teknis kami.

Strategi retry dengan RetryTemplate

Kesalahan sementara (ketidaktersediaan layanan singkat, timeout jaringan) memerlukan percobaan ulang otomatis. Spring Kafka berintegrasi dengan RetryTemplate untuk menerapkan kebijakan retry yang canggih.

KafkaRetryConfig.javajava
@Configuration
public class KafkaRetryConfig {

    @Bean
    public RetryTemplate kafkaRetryTemplate() {
        RetryTemplate template = new RetryTemplate();

        // Kebijakan retry: maksimum 3 percobaan
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        template.setRetryPolicy(retryPolicy);

        // Backoff eksponensial: 1d, 2d, 4d antar percobaan
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        template.setBackOffPolicy(backOffPolicy);

        return template;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            retryableKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                RetryTemplate kafkaRetryTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Konfigurasi retry dengan callback recovery
        factory.setCommonErrorHandler(
            new DefaultErrorHandler(
                // Recovery: tindakan setelah percobaan habis
                (record, exception) -> {
                    log.error("All retries exhausted for record: key={}, value={}",
                        record.key(), record.value(), exception);
                },
                // Backoff eksponensial: awal 1d, maks 30d, 3 percobaan
                new ExponentialBackOff(1000L, 2.0)
            )
        );

        return factory;
    }
}

DefaultErrorHandler menggantikan SeekToCurrentErrorHandler lama sejak Spring Kafka 2.8. API-nya lebih jelas dan opsinya lebih lengkap.

Mengimplementasikan Dead Letter Queue

Setelah percobaan habis, pesan yang gagal sebaiknya dialihkan ke Dead Letter Queue (DLQ) untuk dianalisis kemudian. Pendekatan ini menghindari kehilangan data sekaligus melepaskan consumer dari kebuntuan.

DeadLetterConfig.javajava
@Configuration
public class DeadLetterConfig {

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // Strategi penamaan topic DLT: original-topic.DLT
        return new DeadLetterPublishingRecoverer(
            kafkaTemplate,
            (record, exception) -> {
                // Topic DLT berbasis topic asal
                String dltTopic = record.topic() + ".DLT";

                log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
                    dltTopic, record.key(), exception.getMessage());

                return new TopicPartition(dltTopic, record.partition());
            }
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            dltKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // ErrorHandler dengan recovery DLT
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            deadLetterPublishingRecoverer,
            new FixedBackOff(1000L, 3)
        );

        // Exception yang tidak boleh diulang (langsung ke DLT)
        errorHandler.addNotRetryableExceptions(
            ValidationException.class,
            JsonParseException.class,
            NullPointerException.class
        );

        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }
}

Memisahkan exception yang dapat diulang dengan yang tidak akan mengoptimalkan perilaku. ValidationException menandakan data yang salah bentuk dan tidak akan terbenahi dengan retry, sehingga pengalihan langsung ke DLT layak dilakukan.

Consumer DLT untuk pemrosesan ulang manual

Consumer khusus memantau DLT dan memungkinkan pesan diproses ulang setelah masalah dasarnya diperbaiki.

DeadLetterConsumer.javajava
@Service
@Slf4j
public class DeadLetterConsumer {

    private final AlertingService alertingService;
    private final FailedEventRepository failedEventRepository;

    public DeadLetterConsumer(AlertingService alertingService,
                              FailedEventRepository failedEventRepository) {
        this.alertingService = alertingService;
        this.failedEventRepository = failedEventRepository;
    }

    @KafkaListener(
        topics = "${app.kafka.topics.orders}.DLT",
        groupId = "order-service-dlt"
    )
    public void handleDeadLetter(
            ConsumerRecord<String, OrderEvent> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
            @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
            @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
            Acknowledgment acknowledgment) {

        log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
            originalTopic, originalPartition, originalOffset, exceptionMessage);

        // Persistensi untuk analisis dan pemrosesan ulang nantinya
        FailedEvent failedEvent = FailedEvent.builder()
            .eventId(record.value().eventId())
            .eventType(record.value().eventType())
            .payload(serializePayload(record.value()))
            .originalTopic(originalTopic)
            .originalPartition(originalPartition)
            .originalOffset(originalOffset)
            .exceptionMessage(exceptionMessage)
            .stacktrace(stacktrace)
            .status(FailedEventStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        failedEventRepository.save(failedEvent);

        // Peringatan untuk intervensi manusia
        alertingService.notifyDeadLetter(failedEvent);

        acknowledgment.acknowledge();
    }
}
Header DLT

Spring Kafka secara otomatis memperkaya pesan DLT dengan header berisi metadata kegagalan: exception, stacktrace, topic asal, partition, dan offset. Informasi ini mempermudah diagnosis.

Penanganan idempotensi di sisi consumer

Kafka menjamin pengiriman "at least once": sebuah pesan dapat dikirim beberapa kali jika terjadi crash setelah pemrosesan tetapi sebelum commit. Idempotensi di sisi consumer mencegah efek samping dari pemrosesan ulang.

IdempotentOrderProcessor.javajava
@Service
@Slf4j
public class IdempotentOrderProcessor {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderService orderService;

    public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
                                    OrderService orderService) {
        this.processedEventRepository = processedEventRepository;
        this.orderService = orderService;
    }

    @Transactional
    public void processIdempotent(OrderEvent event) {
        String eventId = event.eventId();

        // Pengecekan: peristiwa sudah diproses?
        if (processedEventRepository.existsByEventId(eventId)) {
            log.info("Event already processed, skipping: {}", eventId);
            return;
        }

        // Pemrosesan bisnis
        switch (event.eventType()) {
            case "ORDER_CREATED" -> orderService.createOrder(event.payload());
            case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
            case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
            default -> log.warn("Unknown event type: {}", event.eventType());
        }

        // Mencatat pemrosesan dalam transaksi yang sama
        ProcessedEvent processed = ProcessedEvent.builder()
            .eventId(eventId)
            .eventType(event.eventType())
            .processedAt(Instant.now())
            .build();

        processedEventRepository.save(processed);

        log.info("Event processed and recorded: {}", eventId);
    }
}
ProcessedEventRepository.javajava
@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {

    // Indeks pada eventId untuk performa optimal
    boolean existsByEventId(String eventId);

    // Pembersihan record lama
    @Modifying
    @Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
    int deleteOlderThan(@Param("before") Instant before);
}

Mencatat pemrosesan dalam transaksi yang sama dengan logika bisnis menjamin atomisitas. Crash setelah pemrosesan tetapi sebelum commit akan memutar ulang kedua operasi secara konsisten.

Pola Transactional Outbox

Pola Outbox menyelesaikan masalah konsistensi antara basis data dan Kafka. Alih-alih mempublikasikan langsung ke Kafka, peristiwa pertama-tama dipersistensi di tabel "outbox" lalu diteruskan oleh proses khusus.

OutboxEvent.javajava
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String eventId;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String eventType;

    @Column(columnDefinition = "TEXT", nullable = false)
    private String payload;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private OutboxStatus status;

    @Column(nullable = false)
    private Instant createdAt;

    private Instant publishedAt;
}

public enum OutboxStatus {
    PENDING, PUBLISHED, FAILED
}
OrderService.javajava
@Service
@Transactional
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    public OrderService(OrderRepository orderRepository,
                       OutboxRepository outboxRepository,
                       ObjectMapper objectMapper) {
        this.orderRepository = orderRepository;
        this.outboxRepository = outboxRepository;
        this.objectMapper = objectMapper;
    }

    public Order createOrder(CreateOrderRequest request) {
        // Membuat pesanan
        Order order = Order.builder()
            .customerId(request.customerId())
            .items(request.items())
            .totalAmount(calculateTotal(request.items()))
            .status(OrderStatus.CREATED)
            .createdAt(Instant.now())
            .build();

        Order savedOrder = orderRepository.save(order);

        // Membuat peristiwa outbox dalam transaksi yang sama
        OrderEvent event = OrderEvent.created(toPayload(savedOrder));

        OutboxEvent outboxEvent = OutboxEvent.builder()
            .eventId(event.eventId())
            .aggregateType("Order")
            .aggregateId(savedOrder.getId().toString())
            .eventType(event.eventType())
            .payload(serialize(event))
            .status(OutboxStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        outboxRepository.save(outboxEvent);

        return savedOrder;
    }

    private String serialize(OrderEvent event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Failed to serialize event", e);
        }
    }
}

Relayer outbox mempublikasikan peristiwa ke Kafka secara asinkron:

OutboxRelayScheduler.javajava
@Component
@Slf4j
public class OutboxRelayScheduler {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OutboxRelayScheduler(OutboxRepository outboxRepository,
                               KafkaTemplate<String, String> kafkaTemplate) {
        this.outboxRepository = outboxRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void relayPendingEvents() {
        // Mengambil peristiwa pending dengan lock
        List<OutboxEvent> pendingEvents = outboxRepository
            .findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);

        for (OutboxEvent event : pendingEvents) {
            try {
                // Mempublikasikan ke Kafka
                kafkaTemplate.send(
                    ordersTopic,
                    event.getAggregateId(),
                    event.getPayload()
                ).get(5, TimeUnit.SECONDS);

                // Memperbarui status
                event.setStatus(OutboxStatus.PUBLISHED);
                event.setPublishedAt(Instant.now());
                outboxRepository.save(event);

                log.debug("Outbox event published: {}", event.getEventId());

            } catch (Exception ex) {
                log.error("Failed to publish outbox event: {}",
                    event.getEventId(), ex);
                event.setStatus(OutboxStatus.FAILED);
                outboxRepository.save(event);
            }
        }
    }
}

Pola ini memastikan sebuah peristiwa hanya dipublikasikan jika transaksi bisnis berhasil, sehingga inkonsistensi antara basis data dan Kafka dihilangkan.

Mulai berlatih!

Uji pengetahuan Anda dengan simulator wawancara dan tes teknis kami.

Pemantauan dan observabilitas

Mengawasi sistem Kafka memerlukan metrik tentang latensi konsumsi, lag, dan kesalahan. Spring Boot Actuator memaparkan metrik-metrik ini melalui Micrometer.

KafkaMetricsConfig.javajava
@Configuration
public class KafkaMetricsConfig {

    @Bean
    public MeterBinder kafkaConsumerMetrics(
            ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        return registry -> {
            // Metrik kustom: peristiwa yang berhasil diproses
            Counter.builder("kafka.consumer.events.processed")
                .description("Number of events successfully processed")
                .tag("topic", "orders")
                .register(registry);

            // Metrik kustom: peristiwa yang gagal
            Counter.builder("kafka.consumer.events.failed")
                .description("Number of events that failed processing")
                .tag("topic", "orders")
                .register(registry);
        };
    }
}
InstrumentedOrderConsumer.javajava
@Service
@Slf4j
public class InstrumentedOrderConsumer {

    private final OrderProcessingService processingService;
    private final MeterRegistry meterRegistry;
    private final Counter processedCounter;
    private final Counter failedCounter;
    private final Timer processingTimer;

    public InstrumentedOrderConsumer(OrderProcessingService processingService,
                                     MeterRegistry meterRegistry) {
        this.processingService = processingService;
        this.meterRegistry = meterRegistry;

        // Inisialisasi counter
        this.processedCounter = Counter.builder("kafka.consumer.events.processed")
            .tag("topic", "orders")
            .register(meterRegistry);

        this.failedCounter = Counter.builder("kafka.consumer.events.failed")
            .tag("topic", "orders")
            .register(meterRegistry);

        // Timer untuk mengukur latensi pemrosesan
        this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
            .tag("topic", "orders")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(meterRegistry);
    }

    @KafkaListener(topics = "${app.kafka.topics.orders}")
    public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {

        Timer.Sample sample = Timer.start(meterRegistry);

        try {
            processingService.process(event);
            acknowledgment.acknowledge();

            processedCounter.increment();
            sample.stop(processingTimer);

        } catch (Exception ex) {
            failedCounter.increment();
            sample.stop(processingTimer);
            throw ex;
        }
    }
}

Metrik ini memungkinkan konfigurasi alert pada lag consumer, tingkat kesalahan, dan latensi pemrosesan, yang esensial untuk mendeteksi masalah secara proaktif.

Kesimpulan

Spring Kafka menyediakan integrasi yang andal untuk membangun arsitektur event-driven yang resilien. Menguasai mekanisme retry, dead letter queue, dan idempotensi menjadi pondasi aplikasi yang siap produksi.

Checklist arsitektur event-driven dengan Spring Kafka:

  • ✅ Mengonfigurasi commit offset manual dengan AckMode.MANUAL
  • ✅ Menggunakan partition key yang konsisten untuk mempertahankan urutan peristiwa
  • ✅ Menerapkan retry dengan backoff eksponensial melalui DefaultErrorHandler
  • ✅ Mengarahkan pesan yang gagal ke Dead Letter Queue
  • ✅ Menjamin idempotensi di sisi consumer dengan pelacakan eventId
  • ✅ Mempertimbangkan pola Outbox untuk konsistensi basis data dan Kafka
  • ✅ Memaparkan metrik melalui Micrometer untuk pemantauan
  • ✅ Menyetel MAX_POLL_INTERVAL_MS sesuai waktu pemrosesan maksimum

Tag

#spring kafka
#event-driven
#kafka consumer
#microservices
#resilience

Bagikan

Artikel terkait