Spring Kafka: dayanıklı tüketicilerle event-driven mimari
Event-driven mimariler için kapsamlı Spring Kafka rehberi. Yapılandırma, dayanıklı tüketiciler, retry politikaları, dead letter queue ve dağıtık uygulamalar için üretim kalıpları.

Apache Kafka, büyük ölçekli event-driven mimarileri için fiili standart hâline gelmiştir. Spring Kafka, Spring Boot uygulamalarına entegrasyonunu basitleştirirken üretim ortamları için kritik dayanıklılık mekanizmalarını da sunar. Bu rehber yapılandırmayı, tüketim kalıplarını ve hata yönetim stratejilerini derinlemesine ele alır.
Bu rehber, Kafka'nın temel kavramlarına aşinalık varsayar: topic'ler, partition'lar, consumer group'lar ve offset'ler. Vurgu, Spring entegrasyonu ve dayanıklılık kalıplarındadır.
Neden event-driven mimari seçilmeli?
Event-driven mimariler, sistem bileşenlerini asenkron olaylar aracılığıyla birbirinden ayırır. Senkron REST çağrılarının aksine, üreticiler yanıt beklemeden olay yayınlar ve tüketicilerin kendi temposunda işlem yapmasına izin verilir.
Bu yaklaşım birkaç kritik avantaj sağlar: her servisin bağımsız yatay ölçeklenebilirliği, geçici arızalara karşı artan dayanıklılık ve Kafka'nın değiştirilemez log'u sayesinde tam izlenebilirlik.
public record OrderEvent(
// İdempotans için olayın benzersiz tanımlayıcısı
String eventId,
// Yönlendirme için olay türü
String eventType,
// Oluşturulma zaman damgası
Instant createdAt,
// İş yükü (payload)
OrderPayload payload
) {
// eventId benzersizliğini garanti eden factory metodu
public static OrderEvent created(OrderPayload payload) {
return new OrderEvent(
UUID.randomUUID().toString(),
"ORDER_CREATED",
Instant.now(),
payload
);
}
}
public record OrderPayload(
Long orderId,
Long customerId,
List<OrderItem> items,
BigDecimal totalAmount
) {}Olay yapısı sistematik biçimde benzersiz bir tanımlayıcı, bir tür ve bir zaman damgası içerir. Bu meta veriler tüketici tarafında filtreleme, yinelenenleri tespit etme ve zamansal izlemeyi mümkün kılar.
Temel Spring Kafka yapılandırması
Entegrasyon spring-kafka starter'ı ve minimum YAML yapılandırması ile başlar. Spring Boot temel bean'leri otomatik olarak yapılandırır: üretim için KafkaTemplate ve tüketim için ConcurrentKafkaListenerContainerFactory.
# application.yml
spring:
kafka:
# Kafka broker adresleri (cluster)
bootstrap-servers: localhost:9092
# Producer yapılandırması
producer:
# String anahtar serileştirme
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# JSON değer serileştirme
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# Tüm replikalardan onay bekle
acks: all
# Ağ hatasında yeniden deneme sayısı
retries: 3
# Consumer yapılandırması
consumer:
# Consumer group tanımlayıcısı
group-id: order-service
# Anahtar deserileştirme
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Hedef türü ile JSON deserileştirme
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# Kayıtlı offset yoksa başlangıç pozisyonu
auto-offset-reset: earliest
# Manuel kontrol için auto-commit'i devre dışı bırak
enable-auto-commit: false
properties:
# Deserileştirme için güvenilir paket
spring.json.trusted.packages: com.example.eventsenable-auto-commit özelliğini kapatmak üretimde temel bir uygulamadır. Manuel offset commit'leri, bir mesajın yalnızca gerçek işleme tamamlandıktan sonra işlenmiş olarak işaretlenmesini sağlar.
Kafka producer oluşturma
KafkaTemplate Kafka'ya gönderim mantığını kapsar. Doğrudan enjeksiyon, iş servislerinde anında kullanım sağlar.
@Service
@Slf4j
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
// Hedef topic (yapılandırmada dışarıya alınmış)
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
OrderPayload payload) {
// Meta verilerle olay oluşturma
OrderEvent event = OrderEvent.created(payload);
// Partition anahtarı olarak orderId kullanımı
// Aynı sipariş için olay sırasını garanti eder
String partitionKey = payload.orderId().toString();
log.info("Publishing ORDER_CREATED event: {} to topic: {}",
event.eventId(), ordersTopic);
// Callback ile asenkron gönderim
return kafkaTemplate.send(ordersTopic, partitionKey, event)
.whenComplete((result, ex) -> {
if (ex == null) {
// Başarı: gönderim meta verilerini logla
RecordMetadata metadata = result.getRecordMetadata();
log.info("Event sent successfully: topic={}, partition={}, offset={}",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
// Hata: araştırma için hatayı logla
log.error("Failed to send event: {}", event.eventId(), ex);
}
});
}
}İş tanımlayıcısına dayalı bir partition anahtarı kullanmak, aynı varlığa ait tüm olayların aynı partition'a düşmesini ve böylece kronolojik sıralarının korunmasını garanti eder.
Boş bir anahtar, mesajları partition'lar arasında round-robin biçiminde dağıtır. Bu, paralelliği artırır ancak sıra garantilerini kaybeder. Anahtar seçimi iş gereksinimlerine bağlıdır.
@KafkaListener ile temel consumer
@KafkaListener anotasyonu bir metodu Kafka tüketicisine dönüştürür. Spring polling döngüsünü, deserileştirmeyi ve offset commit'lerini otomatik olarak yönetir.
@Service
@Slf4j
public class OrderEventConsumer {
private final OrderProcessingService processingService;
public OrderEventConsumer(OrderProcessingService processingService) {
this.processingService = processingService;
}
@KafkaListener(
// Dinlenecek topic(ler)
topics = "${app.kafka.topics.orders}",
// Consumer group
groupId = "${spring.kafka.consumer.group-id}",
// Gelişmiş yapılandırma için özel factory
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
// Otomatik deserileştirilmiş payload
OrderEvent event,
// Enjekte edilen Kafka meta verileri
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
// Manuel commit için Acknowledgment
Acknowledgment acknowledgment) {
log.info("Received event: type={}, partition={}, offset={}",
event.eventType(), partition, offset);
try {
// İş işlemi
processingService.process(event);
// Offset commit'i yalnızca başarılı işlemden sonra
acknowledgment.acknowledge();
log.info("Event processed successfully: {}", event.eventId());
} catch (Exception ex) {
// acknowledge() çağrılmaması yeniden işlemeye yol açar
log.error("Failed to process event: {}", event.eventId(), ex);
throw ex;
}
}
}Acknowledgment enjekte etmek commit üzerinde açık kontrol sağlar. acknowledge() çağrısı yapılmazsa offset commit edilmemiş kalır ve mesaj bir sonraki poll'da yeniden teslim edilir.
Gelişmiş ConsumerFactory yapılandırması
Varsayılan yapılandırma geliştirme için uygundur ancak üretim için ayarlamalar gerektirir. Özel bir factory consumer davranışı üzerinde ince ayar imkânı sağlar.
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, OrderEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
// Bağlantı yapılandırması
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// Deserileştirme
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
// Offset yönetimi
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Performans: poll başına kayıt sayısı
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// Hata tespiti için oturum zaman aşımı
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// Heartbeat aralığı (oturum zaman aşımının 1/3'ü)
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
// Rebalance öncesi maksimum işlem süresi
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// JSON deserializer yapılandırması
JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
deserializer.addTrustedPackages("com.example.events");
deserializer.setUseTypeMapperForKey(false);
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
deserializer
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Manuel acknowledgment modu
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// Tüketici thread sayısı
factory.setConcurrency(3);
// Toplu işleme devre dışı (her seferinde tek mesaj)
factory.setBatchListener(false);
return factory;
}
}MAX_POLL_INTERVAL_MS_CONFIG parametresi, poll() çağrıları arasındaki maksimum gecikmeyi tanımlar. Aşılması, tüketicinin gruptan çıkarılmasına ve bir rebalance'a neden olur. Bu değer beklenen maksimum işlem süresini yansıtmalıdır.
Spring Boot mülakatlarında başarılı olmaya hazır mısın?
İnteraktif simülatörler, flashcards ve teknik testlerle pratik yap.
RetryTemplate ile yeniden deneme stratejileri
Geçici hatalar (bir servisin kısa süreli erişilmezliği, ağ zaman aşımları) otomatik yeniden denemeler gerektirir. Spring Kafka, sofistike retry politikalarını uygulamak için RetryTemplate ile entegre olur.
@Configuration
public class KafkaRetryConfig {
@Bean
public RetryTemplate kafkaRetryTemplate() {
RetryTemplate template = new RetryTemplate();
// Yeniden deneme politikası: en fazla 3 deneme
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
template.setRetryPolicy(retryPolicy);
// Üstel backoff: denemeler arası 1s, 2s, 4s
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
template.setBackOffPolicy(backOffPolicy);
return template;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
retryableKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
RetryTemplate kafkaRetryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// Recovery callback ile retry yapılandırması
factory.setCommonErrorHandler(
new DefaultErrorHandler(
// Recovery: denemeler tükendikten sonraki eylem
(record, exception) -> {
log.error("All retries exhausted for record: key={}, value={}",
record.key(), record.value(), exception);
},
// Üstel backoff: 1s başlangıç, maks 30s, 3 deneme
new ExponentialBackOff(1000L, 2.0)
)
);
return factory;
}
}DefaultErrorHandler, Spring Kafka 2.8'den itibaren eski SeekToCurrentErrorHandler'ın yerini alır. Daha net bir API ve genişletilmiş yapılandırma seçenekleri sunar.
Dead Letter Queue uygulaması
Denemeler tükendikten sonra, başarısız mesajlar daha sonra analiz edilmek üzere bir Dead Letter Queue (DLQ) içine yönlendirilmelidir. Bu yaklaşım veri kaybını önlerken tüketicinin de takılı kalmasını engeller.
@Configuration
public class DeadLetterConfig {
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
KafkaTemplate<String, Object> kafkaTemplate) {
// DLT topic adlandırma stratejisi: original-topic.DLT
return new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, exception) -> {
// Kaynak topic'e dayalı DLT topic
String dltTopic = record.topic() + ".DLT";
log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
dltTopic, record.key(), exception.getMessage());
return new TopicPartition(dltTopic, record.partition());
}
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
dltKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// DLT recovery'li ErrorHandler
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
deadLetterPublishingRecoverer,
new FixedBackOff(1000L, 3)
);
// Yeniden denenmeyecek istisnalar (doğrudan DLT'ye)
errorHandler.addNotRetryableExceptions(
ValidationException.class,
JsonParseException.class,
NullPointerException.class
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}Yeniden denenebilir ve denenemez istisnaların ayırt edilmesi davranışı optimize eder. Bir ValidationException, yeniden denemelerle düzelmeyecek bozuk veriye işaret eder ve doğrudan DLT'ye yönlendirmeyi haklı kılar.
Manuel yeniden işleme için DLT consumer
Adanmış bir tüketici DLT'yi izler ve temel sorun giderildikten sonra mesajların yeniden işlenmesine olanak tanır.
@Service
@Slf4j
public class DeadLetterConsumer {
private final AlertingService alertingService;
private final FailedEventRepository failedEventRepository;
public DeadLetterConsumer(AlertingService alertingService,
FailedEventRepository failedEventRepository) {
this.alertingService = alertingService;
this.failedEventRepository = failedEventRepository;
}
@KafkaListener(
topics = "${app.kafka.topics.orders}.DLT",
groupId = "order-service-dlt"
)
public void handleDeadLetter(
ConsumerRecord<String, OrderEvent> record,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
@Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
@Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
Acknowledgment acknowledgment) {
log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
originalTopic, originalPartition, originalOffset, exceptionMessage);
// Analiz ve daha sonra yeniden işleme için kalıcılık
FailedEvent failedEvent = FailedEvent.builder()
.eventId(record.value().eventId())
.eventType(record.value().eventType())
.payload(serializePayload(record.value()))
.originalTopic(originalTopic)
.originalPartition(originalPartition)
.originalOffset(originalOffset)
.exceptionMessage(exceptionMessage)
.stacktrace(stacktrace)
.status(FailedEventStatus.PENDING)
.createdAt(Instant.now())
.build();
failedEventRepository.save(failedEvent);
// İnsan müdahalesi için uyarı
alertingService.notifyDeadLetter(failedEvent);
acknowledgment.acknowledge();
}
}Spring Kafka, DLT mesajlarını otomatik olarak hata meta verilerini içeren başlıklarla zenginleştirir: istisna, stacktrace, orijinal topic, partition ve offset. Bu bilgiler tanılamayı kolaylaştırır.
Tüketici tarafında idempotans yönetimi
Kafka "at least once" teslim garanti eder: işlemden sonra ancak commit'ten önce çökme yaşanırsa bir mesaj birden çok kez teslim edilebilir. Tüketici tarafında idempotans, yeniden işlemeden kaynaklanan yan etkileri önler.
@Service
@Slf4j
public class IdempotentOrderProcessor {
private final ProcessedEventRepository processedEventRepository;
private final OrderService orderService;
public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
OrderService orderService) {
this.processedEventRepository = processedEventRepository;
this.orderService = orderService;
}
@Transactional
public void processIdempotent(OrderEvent event) {
String eventId = event.eventId();
// Doğrulama: olay zaten işlendi mi?
if (processedEventRepository.existsByEventId(eventId)) {
log.info("Event already processed, skipping: {}", eventId);
return;
}
// İş işlemi
switch (event.eventType()) {
case "ORDER_CREATED" -> orderService.createOrder(event.payload());
case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
default -> log.warn("Unknown event type: {}", event.eventType());
}
// İşlemi aynı transaction içinde kaydet
ProcessedEvent processed = ProcessedEvent.builder()
.eventId(eventId)
.eventType(event.eventType())
.processedAt(Instant.now())
.build();
processedEventRepository.save(processed);
log.info("Event processed and recorded: {}", eventId);
}
}@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {
// Optimum performans için eventId üzerinde indeks
boolean existsByEventId(String eventId);
// Eski kayıtların temizlenmesi
@Modifying
@Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
int deleteOlderThan(@Param("before") Instant before);
}İşlemi iş mantığı ile aynı transaction içinde kaydetmek atomikliği garanti eder. İşlemden sonra ancak commit'ten önce yaşanan bir çökme her iki işlemi de tutarlı biçimde yeniden oynar.
Transactional Outbox kalıbı
Outbox kalıbı, veritabanı ile Kafka arasındaki tutarlılık sorunlarını çözer. Doğrudan Kafka'ya yayınlamak yerine olaylar önce bir "outbox" tablosunda kalıcı hâle getirilir, ardından adanmış bir süreç tarafından iletilir.
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String eventId;
@Column(nullable = false)
private String aggregateType;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String eventType;
@Column(columnDefinition = "TEXT", nullable = false)
private String payload;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private OutboxStatus status;
@Column(nullable = false)
private Instant createdAt;
private Instant publishedAt;
}
public enum OutboxStatus {
PENDING, PUBLISHED, FAILED
}@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
public OrderService(OrderRepository orderRepository,
OutboxRepository outboxRepository,
ObjectMapper objectMapper) {
this.orderRepository = orderRepository;
this.outboxRepository = outboxRepository;
this.objectMapper = objectMapper;
}
public Order createOrder(CreateOrderRequest request) {
// Sipariş oluşturma
Order order = Order.builder()
.customerId(request.customerId())
.items(request.items())
.totalAmount(calculateTotal(request.items()))
.status(OrderStatus.CREATED)
.createdAt(Instant.now())
.build();
Order savedOrder = orderRepository.save(order);
// Aynı transaction içinde outbox olayı oluşturma
OrderEvent event = OrderEvent.created(toPayload(savedOrder));
OutboxEvent outboxEvent = OutboxEvent.builder()
.eventId(event.eventId())
.aggregateType("Order")
.aggregateId(savedOrder.getId().toString())
.eventType(event.eventType())
.payload(serialize(event))
.status(OutboxStatus.PENDING)
.createdAt(Instant.now())
.build();
outboxRepository.save(outboxEvent);
return savedOrder;
}
private String serialize(OrderEvent event) {
try {
return objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
throw new SerializationException("Failed to serialize event", e);
}
}
}Outbox relayer olayları Kafka'ya asenkron olarak yayınlar:
@Component
@Slf4j
public class OutboxRelayScheduler {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OutboxRelayScheduler(OutboxRepository outboxRepository,
KafkaTemplate<String, String> kafkaTemplate) {
this.outboxRepository = outboxRepository;
this.kafkaTemplate = kafkaTemplate;
}
@Scheduled(fixedDelay = 1000)
@Transactional
public void relayPendingEvents() {
// Bekleyen olayları kilitle birlikte alma
List<OutboxEvent> pendingEvents = outboxRepository
.findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);
for (OutboxEvent event : pendingEvents) {
try {
// Kafka'ya yayınlama
kafkaTemplate.send(
ordersTopic,
event.getAggregateId(),
event.getPayload()
).get(5, TimeUnit.SECONDS);
// Durumu güncelleme
event.setStatus(OutboxStatus.PUBLISHED);
event.setPublishedAt(Instant.now());
outboxRepository.save(event);
log.debug("Outbox event published: {}", event.getEventId());
} catch (Exception ex) {
log.error("Failed to publish outbox event: {}",
event.getEventId(), ex);
event.setStatus(OutboxStatus.FAILED);
outboxRepository.save(event);
}
}
}
}Bu kalıp, bir olayın yalnızca iş transaction'ı başarılı olduğunda yayınlanmasını güvence altına alır ve veritabanı ile Kafka arasındaki tutarsızlıkları ortadan kaldırır.
Pratik yapmaya başla!
Mülakat simülatörleri ve teknik testlerle bilgini test et.
İzleme ve gözlemlenebilirlik
Bir Kafka sistemini denetlemek için tüketim gecikmesi, lag ve hatalara ilişkin metrikler gerekir. Spring Boot Actuator bu metrikleri Micrometer aracılığıyla sunar.
@Configuration
public class KafkaMetricsConfig {
@Bean
public MeterBinder kafkaConsumerMetrics(
ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
return registry -> {
// Özel metrik: işlenen olaylar
Counter.builder("kafka.consumer.events.processed")
.description("Number of events successfully processed")
.tag("topic", "orders")
.register(registry);
// Özel metrik: başarısız olaylar
Counter.builder("kafka.consumer.events.failed")
.description("Number of events that failed processing")
.tag("topic", "orders")
.register(registry);
};
}
}@Service
@Slf4j
public class InstrumentedOrderConsumer {
private final OrderProcessingService processingService;
private final MeterRegistry meterRegistry;
private final Counter processedCounter;
private final Counter failedCounter;
private final Timer processingTimer;
public InstrumentedOrderConsumer(OrderProcessingService processingService,
MeterRegistry meterRegistry) {
this.processingService = processingService;
this.meterRegistry = meterRegistry;
// Sayaçları başlatma
this.processedCounter = Counter.builder("kafka.consumer.events.processed")
.tag("topic", "orders")
.register(meterRegistry);
this.failedCounter = Counter.builder("kafka.consumer.events.failed")
.tag("topic", "orders")
.register(meterRegistry);
// İşlem gecikmesini ölçen timer
this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
.tag("topic", "orders")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
}
@KafkaListener(topics = "${app.kafka.topics.orders}")
public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
processingService.process(event);
acknowledgment.acknowledge();
processedCounter.increment();
sample.stop(processingTimer);
} catch (Exception ex) {
failedCounter.increment();
sample.stop(processingTimer);
throw ex;
}
}
}Bu metrikler, tüketici lag'i, hata oranı ve işlem gecikmesi üzerinde uyarılar yapılandırmaya olanak tanır ve sorunların proaktif olarak tespit edilmesi için kritik öneme sahiptir.
Sonuç
Spring Kafka, dayanıklı event-driven mimariler kurmak için sağlam bir entegrasyon sunar. Yeniden deneme mekanizmalarına, dead letter queue'lara ve idempotansa hâkim olmak üretime hazır uygulamaların temelini oluşturur.
Spring Kafka ile event-driven mimari kontrol listesi:
- ✅
AckMode.MANUALile manuel offset commit'i yapılandır - ✅ Olay sırasını korumak için tutarlı bir partition anahtarı kullan
- ✅
DefaultErrorHandleraracılığıyla üstel backoff ile yeniden denemeler uygula - ✅ Başarısız mesajları bir Dead Letter Queue'a yönlendir
- ✅ eventId takibi ile tüketici tarafında idempotansı garanti et
- ✅ Veritabanı/Kafka tutarlılığı için Outbox kalıbını değerlendir
- ✅ İzleme için Micrometer ile metrikleri sun
- ✅
MAX_POLL_INTERVAL_MSdeğerini maksimum işlem süresine göre ayarla
Etiketler
Paylaş
İlgili makaleler

Spring Cloud Gateway Mülakatı: Routing, Filtreler ve Load Balancing
Teknik mülakatlar için Spring Cloud Gateway'i öğrenin: routing, filtreler, load balancing ve API Gateway pattern'leri üzerine kod örnekleriyle 12 soru.

2026'da Spring Boot loglama: Logback ve JSON ile üretim ortamında yapılandırılmış loglar
Spring Boot yapılandırılmış loglama için kapsamlı rehber. Logback JSON yapılandırması, izleme için MDC, üretimde en iyi uygulamalar ve ELK Stack entegrasyonu.

Spring GraphQL Mülakatı: Resolver'lar, DataLoader'lar ve N+1 Problemi Çözümleri
Bu kapsamlı kılavuzla Spring GraphQL mülakatlarına hazırlanın. Resolver'lar, DataLoader'lar, N+1 problemi yönetimi, mutation'lar ve teknik sorular için en iyi uygulamalar.