Spring Kafka: arsitektur event-driven dengan consumer yang resilien
Panduan lengkap Spring Kafka untuk arsitektur event-driven. Konfigurasi, consumer resilien, kebijakan retry, dead letter queue, dan pola produksi untuk aplikasi terdistribusi.

Apache Kafka telah menjadi standar de facto untuk arsitektur event-driven berskala besar. Spring Kafka menyederhanakan integrasinya ke dalam aplikasi Spring Boot sekaligus menyediakan mekanisme resiliensi yang esensial untuk lingkungan produksi. Panduan ini membahas secara mendalam konfigurasi, pola konsumsi, dan strategi penanganan kesalahan.
Panduan ini mengasumsikan keakraban dengan konsep dasar Kafka: topic, partition, consumer group, dan offset. Fokusnya adalah integrasi Spring serta pola resiliensi.
Mengapa memilih arsitektur event-driven?
Arsitektur event-driven memisahkan komponen sistem melalui peristiwa asinkron. Berbeda dengan panggilan REST sinkron, producer mengirim peristiwa tanpa menunggu balasan, sehingga consumer dapat memprosesnya dengan kecepatannya sendiri.
Pendekatan ini memberikan beberapa keuntungan penting: skalabilitas horizontal mandiri per layanan, resiliensi yang lebih tinggi terhadap kegagalan sementara, serta keterlacakan menyeluruh melalui log Kafka yang tidak dapat diubah.
public record OrderEvent(
// Identifier unik peristiwa untuk idempotensi
String eventId,
// Jenis peristiwa untuk routing
String eventType,
// Stempel waktu pembuatan
Instant createdAt,
// Payload bisnis
OrderPayload payload
) {
// Factory method yang menjamin keunikan eventId
public static OrderEvent created(OrderPayload payload) {
return new OrderEvent(
UUID.randomUUID().toString(),
"ORDER_CREATED",
Instant.now(),
payload
);
}
}
public record OrderPayload(
Long orderId,
Long customerId,
List<OrderItem> items,
BigDecimal totalAmount
) {}Struktur peristiwa secara sistematis menyertakan identifier unik, tipe, dan stempel waktu. Metadata ini memungkinkan penyaringan di sisi consumer, pendeteksian duplikat, dan pelacakan temporal.
Konfigurasi dasar Spring Kafka
Integrasi dimulai dengan starter spring-kafka dan konfigurasi YAML minimal. Spring Boot mengonfigurasi bean-bean penting secara otomatis: KafkaTemplate untuk produksi dan ConcurrentKafkaListenerContainerFactory untuk konsumsi.
# application.yml
spring:
kafka:
# Alamat broker Kafka (cluster)
bootstrap-servers: localhost:9092
# Konfigurasi producer
producer:
# Serialisasi key bertipe String
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Serialisasi value JSON
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# Menunggu konfirmasi seluruh replika
acks: all
# Jumlah percobaan ulang saat gagal jaringan
retries: 3
# Konfigurasi consumer
consumer:
# Identifier consumer group
group-id: order-service
# Deserialisasi key
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Deserialisasi JSON dengan tipe target
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# Posisi awal jika belum ada offset tercatat
auto-offset-reset: earliest
# Nonaktifkan auto-commit untuk kontrol manual
enable-auto-commit: false
properties:
# Paket terpercaya untuk deserialisasi
spring.json.trusted.packages: com.example.eventsMenonaktifkan enable-auto-commit adalah praktik produksi yang esensial. Commit offset manual menjamin pesan hanya ditandai sebagai diproses setelah pemrosesan benar-benar selesai.
Membuat producer Kafka
KafkaTemplate mengenkapsulasi logika pengiriman ke Kafka. Injeksi langsungnya memungkinkan penggunaan instan di dalam layanan bisnis.
@Service
@Slf4j
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
// Topic tujuan (dieksternalkan dalam konfigurasi)
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
OrderPayload payload) {
// Pembuatan peristiwa beserta metadatanya
OrderEvent event = OrderEvent.created(payload);
// Penggunaan orderId sebagai partition key
// Menjamin urutan peristiwa untuk pesanan yang sama
String partitionKey = payload.orderId().toString();
log.info("Publishing ORDER_CREATED event: {} to topic: {}",
event.eventId(), ordersTopic);
// Pengiriman asinkron dengan callback
return kafkaTemplate.send(ordersTopic, partitionKey, event)
.whenComplete((result, ex) -> {
if (ex == null) {
// Sukses: log metadata pengiriman
RecordMetadata metadata = result.getRecordMetadata();
log.info("Event sent successfully: topic={}, partition={}, offset={}",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
// Gagal: log kesalahan untuk investigasi
log.error("Failed to send event: {}", event.eventId(), ex);
}
});
}
}Penggunaan partition key berbasis identifier bisnis memastikan seluruh peristiwa dari entitas yang sama jatuh ke partition yang sama, sehingga urutan kronologisnya tetap terjaga.
Key bernilai null mendistribusikan pesan secara round-robin ke seluruh partition. Cara ini memaksimalkan paralelisme tetapi kehilangan jaminan urutan. Pemilihan key bergantung pada kebutuhan bisnis.
Consumer dasar dengan @KafkaListener
Anotasi @KafkaListener mengubah sebuah metode menjadi consumer Kafka. Spring secara otomatis mengelola loop polling, deserialisasi, dan commit offset.
@Service
@Slf4j
public class OrderEventConsumer {
private final OrderProcessingService processingService;
public OrderEventConsumer(OrderProcessingService processingService) {
this.processingService = processingService;
}
@KafkaListener(
// Topic yang didengarkan
topics = "${app.kafka.topics.orders}",
// Consumer group
groupId = "${spring.kafka.consumer.group-id}",
// Factory kustom untuk konfigurasi lanjutan
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
// Payload yang dideserialisasi otomatis
OrderEvent event,
// Metadata Kafka yang disuntikkan
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
// Acknowledgment untuk commit manual
Acknowledgment acknowledgment) {
log.info("Received event: type={}, partition={}, offset={}",
event.eventType(), partition, offset);
try {
// Pemrosesan bisnis
processingService.process(event);
// Commit offset hanya setelah pemrosesan berhasil
acknowledgment.acknowledge();
log.info("Event processed successfully: {}", event.eventId());
} catch (Exception ex) {
// Tidak memanggil acknowledge() menyebabkan pemrosesan ulang
log.error("Failed to process event: {}", event.eventId(), ex);
throw ex;
}
}
}Menyuntikkan Acknowledgment memungkinkan kontrol commit yang eksplisit. Tanpa pemanggilan acknowledge(), offset tetap belum di-commit dan pesan akan dikirim ulang pada poll berikutnya.
Konfigurasi ConsumerFactory tingkat lanjut
Konfigurasi default sudah memadai untuk pengembangan tetapi memerlukan penyesuaian untuk produksi. Factory kustom memberikan kendali halus atas perilaku consumer.
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, OrderEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
// Konfigurasi koneksi
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// Deserialisasi
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
// Pengelolaan offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Performa: jumlah record per poll
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// Timeout sesi untuk deteksi kegagalan
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// Interval heartbeat (1/3 dari timeout sesi)
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
// Waktu maksimum pemrosesan sebelum rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// Konfigurasi deserializer JSON
JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
deserializer.addTrustedPackages("com.example.events");
deserializer.setUseTypeMapperForKey(false);
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
deserializer
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Mode acknowledgment manual
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// Jumlah thread consumer
factory.setConcurrency(3);
// Pemrosesan batch dimatikan (satu pesan setiap kali)
factory.setBatchListener(false);
return factory;
}
}Parameter MAX_POLL_INTERVAL_MS_CONFIG menentukan jeda maksimum antara pemanggilan poll(). Jika dilampaui, consumer akan dikeluarkan dari grup dan terjadi rebalance. Nilai ini harus mencerminkan waktu pemrosesan maksimum yang diharapkan.
Siap menguasai wawancara Spring Boot Anda?
Berlatih dengan simulator interaktif, flashcards, dan tes teknis kami.
Strategi retry dengan RetryTemplate
Kesalahan sementara (ketidaktersediaan layanan singkat, timeout jaringan) memerlukan percobaan ulang otomatis. Spring Kafka berintegrasi dengan RetryTemplate untuk menerapkan kebijakan retry yang canggih.
@Configuration
public class KafkaRetryConfig {
@Bean
public RetryTemplate kafkaRetryTemplate() {
RetryTemplate template = new RetryTemplate();
// Kebijakan retry: maksimum 3 percobaan
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
template.setRetryPolicy(retryPolicy);
// Backoff eksponensial: 1d, 2d, 4d antar percobaan
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
template.setBackOffPolicy(backOffPolicy);
return template;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
retryableKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
RetryTemplate kafkaRetryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// Konfigurasi retry dengan callback recovery
factory.setCommonErrorHandler(
new DefaultErrorHandler(
// Recovery: tindakan setelah percobaan habis
(record, exception) -> {
log.error("All retries exhausted for record: key={}, value={}",
record.key(), record.value(), exception);
},
// Backoff eksponensial: awal 1d, maks 30d, 3 percobaan
new ExponentialBackOff(1000L, 2.0)
)
);
return factory;
}
}DefaultErrorHandler menggantikan SeekToCurrentErrorHandler lama sejak Spring Kafka 2.8. API-nya lebih jelas dan opsinya lebih lengkap.
Mengimplementasikan Dead Letter Queue
Setelah percobaan habis, pesan yang gagal sebaiknya dialihkan ke Dead Letter Queue (DLQ) untuk dianalisis kemudian. Pendekatan ini menghindari kehilangan data sekaligus melepaskan consumer dari kebuntuan.
@Configuration
public class DeadLetterConfig {
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
KafkaTemplate<String, Object> kafkaTemplate) {
// Strategi penamaan topic DLT: original-topic.DLT
return new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, exception) -> {
// Topic DLT berbasis topic asal
String dltTopic = record.topic() + ".DLT";
log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
dltTopic, record.key(), exception.getMessage());
return new TopicPartition(dltTopic, record.partition());
}
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
dltKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// ErrorHandler dengan recovery DLT
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
deadLetterPublishingRecoverer,
new FixedBackOff(1000L, 3)
);
// Exception yang tidak boleh diulang (langsung ke DLT)
errorHandler.addNotRetryableExceptions(
ValidationException.class,
JsonParseException.class,
NullPointerException.class
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}Memisahkan exception yang dapat diulang dengan yang tidak akan mengoptimalkan perilaku. ValidationException menandakan data yang salah bentuk dan tidak akan terbenahi dengan retry, sehingga pengalihan langsung ke DLT layak dilakukan.
Consumer DLT untuk pemrosesan ulang manual
Consumer khusus memantau DLT dan memungkinkan pesan diproses ulang setelah masalah dasarnya diperbaiki.
@Service
@Slf4j
public class DeadLetterConsumer {
private final AlertingService alertingService;
private final FailedEventRepository failedEventRepository;
public DeadLetterConsumer(AlertingService alertingService,
FailedEventRepository failedEventRepository) {
this.alertingService = alertingService;
this.failedEventRepository = failedEventRepository;
}
@KafkaListener(
topics = "${app.kafka.topics.orders}.DLT",
groupId = "order-service-dlt"
)
public void handleDeadLetter(
ConsumerRecord<String, OrderEvent> record,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
@Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
@Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
Acknowledgment acknowledgment) {
log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
originalTopic, originalPartition, originalOffset, exceptionMessage);
// Persistensi untuk analisis dan pemrosesan ulang nantinya
FailedEvent failedEvent = FailedEvent.builder()
.eventId(record.value().eventId())
.eventType(record.value().eventType())
.payload(serializePayload(record.value()))
.originalTopic(originalTopic)
.originalPartition(originalPartition)
.originalOffset(originalOffset)
.exceptionMessage(exceptionMessage)
.stacktrace(stacktrace)
.status(FailedEventStatus.PENDING)
.createdAt(Instant.now())
.build();
failedEventRepository.save(failedEvent);
// Peringatan untuk intervensi manusia
alertingService.notifyDeadLetter(failedEvent);
acknowledgment.acknowledge();
}
}Spring Kafka secara otomatis memperkaya pesan DLT dengan header berisi metadata kegagalan: exception, stacktrace, topic asal, partition, dan offset. Informasi ini mempermudah diagnosis.
Penanganan idempotensi di sisi consumer
Kafka menjamin pengiriman "at least once": sebuah pesan dapat dikirim beberapa kali jika terjadi crash setelah pemrosesan tetapi sebelum commit. Idempotensi di sisi consumer mencegah efek samping dari pemrosesan ulang.
@Service
@Slf4j
public class IdempotentOrderProcessor {
private final ProcessedEventRepository processedEventRepository;
private final OrderService orderService;
public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
OrderService orderService) {
this.processedEventRepository = processedEventRepository;
this.orderService = orderService;
}
@Transactional
public void processIdempotent(OrderEvent event) {
String eventId = event.eventId();
// Pengecekan: peristiwa sudah diproses?
if (processedEventRepository.existsByEventId(eventId)) {
log.info("Event already processed, skipping: {}", eventId);
return;
}
// Pemrosesan bisnis
switch (event.eventType()) {
case "ORDER_CREATED" -> orderService.createOrder(event.payload());
case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
default -> log.warn("Unknown event type: {}", event.eventType());
}
// Mencatat pemrosesan dalam transaksi yang sama
ProcessedEvent processed = ProcessedEvent.builder()
.eventId(eventId)
.eventType(event.eventType())
.processedAt(Instant.now())
.build();
processedEventRepository.save(processed);
log.info("Event processed and recorded: {}", eventId);
}
}@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {
// Indeks pada eventId untuk performa optimal
boolean existsByEventId(String eventId);
// Pembersihan record lama
@Modifying
@Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
int deleteOlderThan(@Param("before") Instant before);
}Mencatat pemrosesan dalam transaksi yang sama dengan logika bisnis menjamin atomisitas. Crash setelah pemrosesan tetapi sebelum commit akan memutar ulang kedua operasi secara konsisten.
Pola Transactional Outbox
Pola Outbox menyelesaikan masalah konsistensi antara basis data dan Kafka. Alih-alih mempublikasikan langsung ke Kafka, peristiwa pertama-tama dipersistensi di tabel "outbox" lalu diteruskan oleh proses khusus.
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String eventId;
@Column(nullable = false)
private String aggregateType;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String eventType;
@Column(columnDefinition = "TEXT", nullable = false)
private String payload;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private OutboxStatus status;
@Column(nullable = false)
private Instant createdAt;
private Instant publishedAt;
}
public enum OutboxStatus {
PENDING, PUBLISHED, FAILED
}@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
public OrderService(OrderRepository orderRepository,
OutboxRepository outboxRepository,
ObjectMapper objectMapper) {
this.orderRepository = orderRepository;
this.outboxRepository = outboxRepository;
this.objectMapper = objectMapper;
}
public Order createOrder(CreateOrderRequest request) {
// Membuat pesanan
Order order = Order.builder()
.customerId(request.customerId())
.items(request.items())
.totalAmount(calculateTotal(request.items()))
.status(OrderStatus.CREATED)
.createdAt(Instant.now())
.build();
Order savedOrder = orderRepository.save(order);
// Membuat peristiwa outbox dalam transaksi yang sama
OrderEvent event = OrderEvent.created(toPayload(savedOrder));
OutboxEvent outboxEvent = OutboxEvent.builder()
.eventId(event.eventId())
.aggregateType("Order")
.aggregateId(savedOrder.getId().toString())
.eventType(event.eventType())
.payload(serialize(event))
.status(OutboxStatus.PENDING)
.createdAt(Instant.now())
.build();
outboxRepository.save(outboxEvent);
return savedOrder;
}
private String serialize(OrderEvent event) {
try {
return objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
throw new SerializationException("Failed to serialize event", e);
}
}
}Relayer outbox mempublikasikan peristiwa ke Kafka secara asinkron:
@Component
@Slf4j
public class OutboxRelayScheduler {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OutboxRelayScheduler(OutboxRepository outboxRepository,
KafkaTemplate<String, String> kafkaTemplate) {
this.outboxRepository = outboxRepository;
this.kafkaTemplate = kafkaTemplate;
}
@Scheduled(fixedDelay = 1000)
@Transactional
public void relayPendingEvents() {
// Mengambil peristiwa pending dengan lock
List<OutboxEvent> pendingEvents = outboxRepository
.findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);
for (OutboxEvent event : pendingEvents) {
try {
// Mempublikasikan ke Kafka
kafkaTemplate.send(
ordersTopic,
event.getAggregateId(),
event.getPayload()
).get(5, TimeUnit.SECONDS);
// Memperbarui status
event.setStatus(OutboxStatus.PUBLISHED);
event.setPublishedAt(Instant.now());
outboxRepository.save(event);
log.debug("Outbox event published: {}", event.getEventId());
} catch (Exception ex) {
log.error("Failed to publish outbox event: {}",
event.getEventId(), ex);
event.setStatus(OutboxStatus.FAILED);
outboxRepository.save(event);
}
}
}
}Pola ini memastikan sebuah peristiwa hanya dipublikasikan jika transaksi bisnis berhasil, sehingga inkonsistensi antara basis data dan Kafka dihilangkan.
Mulai berlatih!
Uji pengetahuan Anda dengan simulator wawancara dan tes teknis kami.
Pemantauan dan observabilitas
Mengawasi sistem Kafka memerlukan metrik tentang latensi konsumsi, lag, dan kesalahan. Spring Boot Actuator memaparkan metrik-metrik ini melalui Micrometer.
@Configuration
public class KafkaMetricsConfig {
@Bean
public MeterBinder kafkaConsumerMetrics(
ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
return registry -> {
// Metrik kustom: peristiwa yang berhasil diproses
Counter.builder("kafka.consumer.events.processed")
.description("Number of events successfully processed")
.tag("topic", "orders")
.register(registry);
// Metrik kustom: peristiwa yang gagal
Counter.builder("kafka.consumer.events.failed")
.description("Number of events that failed processing")
.tag("topic", "orders")
.register(registry);
};
}
}@Service
@Slf4j
public class InstrumentedOrderConsumer {
private final OrderProcessingService processingService;
private final MeterRegistry meterRegistry;
private final Counter processedCounter;
private final Counter failedCounter;
private final Timer processingTimer;
public InstrumentedOrderConsumer(OrderProcessingService processingService,
MeterRegistry meterRegistry) {
this.processingService = processingService;
this.meterRegistry = meterRegistry;
// Inisialisasi counter
this.processedCounter = Counter.builder("kafka.consumer.events.processed")
.tag("topic", "orders")
.register(meterRegistry);
this.failedCounter = Counter.builder("kafka.consumer.events.failed")
.tag("topic", "orders")
.register(meterRegistry);
// Timer untuk mengukur latensi pemrosesan
this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
.tag("topic", "orders")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
}
@KafkaListener(topics = "${app.kafka.topics.orders}")
public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
processingService.process(event);
acknowledgment.acknowledge();
processedCounter.increment();
sample.stop(processingTimer);
} catch (Exception ex) {
failedCounter.increment();
sample.stop(processingTimer);
throw ex;
}
}
}Metrik ini memungkinkan konfigurasi alert pada lag consumer, tingkat kesalahan, dan latensi pemrosesan, yang esensial untuk mendeteksi masalah secara proaktif.
Kesimpulan
Spring Kafka menyediakan integrasi yang andal untuk membangun arsitektur event-driven yang resilien. Menguasai mekanisme retry, dead letter queue, dan idempotensi menjadi pondasi aplikasi yang siap produksi.
Checklist arsitektur event-driven dengan Spring Kafka:
- ✅ Mengonfigurasi commit offset manual dengan
AckMode.MANUAL - ✅ Menggunakan partition key yang konsisten untuk mempertahankan urutan peristiwa
- ✅ Menerapkan retry dengan backoff eksponensial melalui
DefaultErrorHandler - ✅ Mengarahkan pesan yang gagal ke Dead Letter Queue
- ✅ Menjamin idempotensi di sisi consumer dengan pelacakan eventId
- ✅ Mempertimbangkan pola Outbox untuk konsistensi basis data dan Kafka
- ✅ Memaparkan metrik melalui Micrometer untuk pemantauan
- ✅ Menyetel
MAX_POLL_INTERVAL_MSsesuai waktu pemrosesan maksimum
Tag
Bagikan
Artikel terkait

Wawancara Spring Cloud Gateway: Routing, Filter, dan Load Balancing
Kuasai Spring Cloud Gateway untuk wawancara teknis: 12 pertanyaan tentang routing, filter, load balancing, dan pola API Gateway dengan contoh kode.

Logging Spring Boot di 2026: log terstruktur produksi dengan Logback dan JSON
Panduan lengkap logging terstruktur di Spring Boot. Konfigurasi Logback JSON, MDC untuk tracing, praktik terbaik di produksi, dan integrasi ELK Stack.

Wawancara Spring GraphQL: Resolver, DataLoader, dan Solusi Masalah N+1
Persiapkan diri untuk wawancara Spring GraphQL dengan panduan lengkap ini. Resolver, DataLoader, penanganan masalah N+1, mutation, dan praktik terbaik untuk pertanyaan teknis.