Spring Kafka: dayanıklı tüketicilerle event-driven mimari

Event-driven mimariler için kapsamlı Spring Kafka rehberi. Yapılandırma, dayanıklı tüketiciler, retry politikaları, dead letter queue ve dağıtık uygulamalar için üretim kalıpları.

Spring Kafka ile dayanıklı tüketicili event-driven mimari

Apache Kafka, büyük ölçekli event-driven mimarileri için fiili standart hâline gelmiştir. Spring Kafka, Spring Boot uygulamalarına entegrasyonunu basitleştirirken üretim ortamları için kritik dayanıklılık mekanizmalarını da sunar. Bu rehber yapılandırmayı, tüketim kalıplarını ve hata yönetim stratejilerini derinlemesine ele alır.

Ön Koşullar

Bu rehber, Kafka'nın temel kavramlarına aşinalık varsayar: topic'ler, partition'lar, consumer group'lar ve offset'ler. Vurgu, Spring entegrasyonu ve dayanıklılık kalıplarındadır.

Neden event-driven mimari seçilmeli?

Event-driven mimariler, sistem bileşenlerini asenkron olaylar aracılığıyla birbirinden ayırır. Senkron REST çağrılarının aksine, üreticiler yanıt beklemeden olay yayınlar ve tüketicilerin kendi temposunda işlem yapmasına izin verilir.

Bu yaklaşım birkaç kritik avantaj sağlar: her servisin bağımsız yatay ölçeklenebilirliği, geçici arızalara karşı artan dayanıklılık ve Kafka'nın değiştirilemez log'u sayesinde tam izlenebilirlik.

OrderEvent.javajava
public record OrderEvent(
    // İdempotans için olayın benzersiz tanımlayıcısı
    String eventId,
    // Yönlendirme için olay türü
    String eventType,
    // Oluşturulma zaman damgası
    Instant createdAt,
    // İş yükü (payload)
    OrderPayload payload
) {
    // eventId benzersizliğini garanti eden factory metodu
    public static OrderEvent created(OrderPayload payload) {
        return new OrderEvent(
            UUID.randomUUID().toString(),
            "ORDER_CREATED",
            Instant.now(),
            payload
        );
    }
}

public record OrderPayload(
    Long orderId,
    Long customerId,
    List<OrderItem> items,
    BigDecimal totalAmount
) {}

Olay yapısı sistematik biçimde benzersiz bir tanımlayıcı, bir tür ve bir zaman damgası içerir. Bu meta veriler tüketici tarafında filtreleme, yinelenenleri tespit etme ve zamansal izlemeyi mümkün kılar.

Temel Spring Kafka yapılandırması

Entegrasyon spring-kafka starter'ı ve minimum YAML yapılandırması ile başlar. Spring Boot temel bean'leri otomatik olarak yapılandırır: üretim için KafkaTemplate ve tüketim için ConcurrentKafkaListenerContainerFactory.

yaml
# application.yml
spring:
  kafka:
    # Kafka broker adresleri (cluster)
    bootstrap-servers: localhost:9092

    # Producer yapılandırması
    producer:
      # String anahtar serileştirme
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # JSON değer serileştirme
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # Tüm replikalardan onay bekle
      acks: all
      # Ağ hatasında yeniden deneme sayısı
      retries: 3

    # Consumer yapılandırması
    consumer:
      # Consumer group tanımlayıcısı
      group-id: order-service
      # Anahtar deserileştirme
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Hedef türü ile JSON deserileştirme
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # Kayıtlı offset yoksa başlangıç pozisyonu
      auto-offset-reset: earliest
      # Manuel kontrol için auto-commit'i devre dışı bırak
      enable-auto-commit: false
      properties:
        # Deserileştirme için güvenilir paket
        spring.json.trusted.packages: com.example.events

enable-auto-commit özelliğini kapatmak üretimde temel bir uygulamadır. Manuel offset commit'leri, bir mesajın yalnızca gerçek işleme tamamlandıktan sonra işlenmiş olarak işaretlenmesini sağlar.

Kafka producer oluşturma

KafkaTemplate Kafka'ya gönderim mantığını kapsar. Doğrudan enjeksiyon, iş servislerinde anında kullanım sağlar.

OrderEventPublisher.javajava
@Service
@Slf4j
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    // Hedef topic (yapılandırmada dışarıya alınmış)
    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
            OrderPayload payload) {

        // Meta verilerle olay oluşturma
        OrderEvent event = OrderEvent.created(payload);

        // Partition anahtarı olarak orderId kullanımı
        // Aynı sipariş için olay sırasını garanti eder
        String partitionKey = payload.orderId().toString();

        log.info("Publishing ORDER_CREATED event: {} to topic: {}",
            event.eventId(), ordersTopic);

        // Callback ile asenkron gönderim
        return kafkaTemplate.send(ordersTopic, partitionKey, event)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    // Başarı: gönderim meta verilerini logla
                    RecordMetadata metadata = result.getRecordMetadata();
                    log.info("Event sent successfully: topic={}, partition={}, offset={}",
                        metadata.topic(), metadata.partition(), metadata.offset());
                } else {
                    // Hata: araştırma için hatayı logla
                    log.error("Failed to send event: {}", event.eventId(), ex);
                }
            });
    }
}

İş tanımlayıcısına dayalı bir partition anahtarı kullanmak, aynı varlığa ait tüm olayların aynı partition'a düşmesini ve böylece kronolojik sıralarının korunmasını garanti eder.

Partition anahtarı

Boş bir anahtar, mesajları partition'lar arasında round-robin biçiminde dağıtır. Bu, paralelliği artırır ancak sıra garantilerini kaybeder. Anahtar seçimi iş gereksinimlerine bağlıdır.

@KafkaListener ile temel consumer

@KafkaListener anotasyonu bir metodu Kafka tüketicisine dönüştürür. Spring polling döngüsünü, deserileştirmeyi ve offset commit'lerini otomatik olarak yönetir.

OrderEventConsumer.javajava
@Service
@Slf4j
public class OrderEventConsumer {

    private final OrderProcessingService processingService;

    public OrderEventConsumer(OrderProcessingService processingService) {
        this.processingService = processingService;
    }

    @KafkaListener(
        // Dinlenecek topic(ler)
        topics = "${app.kafka.topics.orders}",
        // Consumer group
        groupId = "${spring.kafka.consumer.group-id}",
        // Gelişmiş yapılandırma için özel factory
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            // Otomatik deserileştirilmiş payload
            OrderEvent event,
            // Enjekte edilen Kafka meta verileri
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            // Manuel commit için Acknowledgment
            Acknowledgment acknowledgment) {

        log.info("Received event: type={}, partition={}, offset={}",
            event.eventType(), partition, offset);

        try {
            // İş işlemi
            processingService.process(event);

            // Offset commit'i yalnızca başarılı işlemden sonra
            acknowledgment.acknowledge();

            log.info("Event processed successfully: {}", event.eventId());

        } catch (Exception ex) {
            // acknowledge() çağrılmaması yeniden işlemeye yol açar
            log.error("Failed to process event: {}", event.eventId(), ex);
            throw ex;
        }
    }
}

Acknowledgment enjekte etmek commit üzerinde açık kontrol sağlar. acknowledge() çağrısı yapılmazsa offset commit edilmemiş kalır ve mesaj bir sonraki poll'da yeniden teslim edilir.

Gelişmiş ConsumerFactory yapılandırması

Varsayılan yapılandırma geliştirme için uygundur ancak üretim için ayarlamalar gerektirir. Özel bir factory consumer davranışı üzerinde ince ayar imkânı sağlar.

KafkaConsumerConfig.javajava
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        // Bağlantı yapılandırması
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // Deserileştirme
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);

        // Offset yönetimi
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // Performans: poll başına kayıt sayısı
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        // Hata tespiti için oturum zaman aşımı
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        // Heartbeat aralığı (oturum zaman aşımının 1/3'ü)
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

        // Rebalance öncesi maksimum işlem süresi
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

        // JSON deserializer yapılandırması
        JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
        deserializer.addTrustedPackages("com.example.events");
        deserializer.setUseTypeMapperForKey(false);

        return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        // Manuel acknowledgment modu
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Tüketici thread sayısı
        factory.setConcurrency(3);

        // Toplu işleme devre dışı (her seferinde tek mesaj)
        factory.setBatchListener(false);

        return factory;
    }
}

MAX_POLL_INTERVAL_MS_CONFIG parametresi, poll() çağrıları arasındaki maksimum gecikmeyi tanımlar. Aşılması, tüketicinin gruptan çıkarılmasına ve bir rebalance'a neden olur. Bu değer beklenen maksimum işlem süresini yansıtmalıdır.

Spring Boot mülakatlarında başarılı olmaya hazır mısın?

İnteraktif simülatörler, flashcards ve teknik testlerle pratik yap.

RetryTemplate ile yeniden deneme stratejileri

Geçici hatalar (bir servisin kısa süreli erişilmezliği, ağ zaman aşımları) otomatik yeniden denemeler gerektirir. Spring Kafka, sofistike retry politikalarını uygulamak için RetryTemplate ile entegre olur.

KafkaRetryConfig.javajava
@Configuration
public class KafkaRetryConfig {

    @Bean
    public RetryTemplate kafkaRetryTemplate() {
        RetryTemplate template = new RetryTemplate();

        // Yeniden deneme politikası: en fazla 3 deneme
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        template.setRetryPolicy(retryPolicy);

        // Üstel backoff: denemeler arası 1s, 2s, 4s
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        template.setBackOffPolicy(backOffPolicy);

        return template;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            retryableKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                RetryTemplate kafkaRetryTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Recovery callback ile retry yapılandırması
        factory.setCommonErrorHandler(
            new DefaultErrorHandler(
                // Recovery: denemeler tükendikten sonraki eylem
                (record, exception) -> {
                    log.error("All retries exhausted for record: key={}, value={}",
                        record.key(), record.value(), exception);
                },
                // Üstel backoff: 1s başlangıç, maks 30s, 3 deneme
                new ExponentialBackOff(1000L, 2.0)
            )
        );

        return factory;
    }
}

DefaultErrorHandler, Spring Kafka 2.8'den itibaren eski SeekToCurrentErrorHandler'ın yerini alır. Daha net bir API ve genişletilmiş yapılandırma seçenekleri sunar.

Dead Letter Queue uygulaması

Denemeler tükendikten sonra, başarısız mesajlar daha sonra analiz edilmek üzere bir Dead Letter Queue (DLQ) içine yönlendirilmelidir. Bu yaklaşım veri kaybını önlerken tüketicinin de takılı kalmasını engeller.

DeadLetterConfig.javajava
@Configuration
public class DeadLetterConfig {

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // DLT topic adlandırma stratejisi: original-topic.DLT
        return new DeadLetterPublishingRecoverer(
            kafkaTemplate,
            (record, exception) -> {
                // Kaynak topic'e dayalı DLT topic
                String dltTopic = record.topic() + ".DLT";

                log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
                    dltTopic, record.key(), exception.getMessage());

                return new TopicPartition(dltTopic, record.partition());
            }
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            dltKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // DLT recovery'li ErrorHandler
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            deadLetterPublishingRecoverer,
            new FixedBackOff(1000L, 3)
        );

        // Yeniden denenmeyecek istisnalar (doğrudan DLT'ye)
        errorHandler.addNotRetryableExceptions(
            ValidationException.class,
            JsonParseException.class,
            NullPointerException.class
        );

        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }
}

Yeniden denenebilir ve denenemez istisnaların ayırt edilmesi davranışı optimize eder. Bir ValidationException, yeniden denemelerle düzelmeyecek bozuk veriye işaret eder ve doğrudan DLT'ye yönlendirmeyi haklı kılar.

Manuel yeniden işleme için DLT consumer

Adanmış bir tüketici DLT'yi izler ve temel sorun giderildikten sonra mesajların yeniden işlenmesine olanak tanır.

DeadLetterConsumer.javajava
@Service
@Slf4j
public class DeadLetterConsumer {

    private final AlertingService alertingService;
    private final FailedEventRepository failedEventRepository;

    public DeadLetterConsumer(AlertingService alertingService,
                              FailedEventRepository failedEventRepository) {
        this.alertingService = alertingService;
        this.failedEventRepository = failedEventRepository;
    }

    @KafkaListener(
        topics = "${app.kafka.topics.orders}.DLT",
        groupId = "order-service-dlt"
    )
    public void handleDeadLetter(
            ConsumerRecord<String, OrderEvent> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
            @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
            @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
            Acknowledgment acknowledgment) {

        log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
            originalTopic, originalPartition, originalOffset, exceptionMessage);

        // Analiz ve daha sonra yeniden işleme için kalıcılık
        FailedEvent failedEvent = FailedEvent.builder()
            .eventId(record.value().eventId())
            .eventType(record.value().eventType())
            .payload(serializePayload(record.value()))
            .originalTopic(originalTopic)
            .originalPartition(originalPartition)
            .originalOffset(originalOffset)
            .exceptionMessage(exceptionMessage)
            .stacktrace(stacktrace)
            .status(FailedEventStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        failedEventRepository.save(failedEvent);

        // İnsan müdahalesi için uyarı
        alertingService.notifyDeadLetter(failedEvent);

        acknowledgment.acknowledge();
    }
}
DLT başlıkları

Spring Kafka, DLT mesajlarını otomatik olarak hata meta verilerini içeren başlıklarla zenginleştirir: istisna, stacktrace, orijinal topic, partition ve offset. Bu bilgiler tanılamayı kolaylaştırır.

Tüketici tarafında idempotans yönetimi

Kafka "at least once" teslim garanti eder: işlemden sonra ancak commit'ten önce çökme yaşanırsa bir mesaj birden çok kez teslim edilebilir. Tüketici tarafında idempotans, yeniden işlemeden kaynaklanan yan etkileri önler.

IdempotentOrderProcessor.javajava
@Service
@Slf4j
public class IdempotentOrderProcessor {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderService orderService;

    public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
                                    OrderService orderService) {
        this.processedEventRepository = processedEventRepository;
        this.orderService = orderService;
    }

    @Transactional
    public void processIdempotent(OrderEvent event) {
        String eventId = event.eventId();

        // Doğrulama: olay zaten işlendi mi?
        if (processedEventRepository.existsByEventId(eventId)) {
            log.info("Event already processed, skipping: {}", eventId);
            return;
        }

        // İş işlemi
        switch (event.eventType()) {
            case "ORDER_CREATED" -> orderService.createOrder(event.payload());
            case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
            case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
            default -> log.warn("Unknown event type: {}", event.eventType());
        }

        // İşlemi aynı transaction içinde kaydet
        ProcessedEvent processed = ProcessedEvent.builder()
            .eventId(eventId)
            .eventType(event.eventType())
            .processedAt(Instant.now())
            .build();

        processedEventRepository.save(processed);

        log.info("Event processed and recorded: {}", eventId);
    }
}
ProcessedEventRepository.javajava
@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {

    // Optimum performans için eventId üzerinde indeks
    boolean existsByEventId(String eventId);

    // Eski kayıtların temizlenmesi
    @Modifying
    @Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
    int deleteOlderThan(@Param("before") Instant before);
}

İşlemi iş mantığı ile aynı transaction içinde kaydetmek atomikliği garanti eder. İşlemden sonra ancak commit'ten önce yaşanan bir çökme her iki işlemi de tutarlı biçimde yeniden oynar.

Transactional Outbox kalıbı

Outbox kalıbı, veritabanı ile Kafka arasındaki tutarlılık sorunlarını çözer. Doğrudan Kafka'ya yayınlamak yerine olaylar önce bir "outbox" tablosunda kalıcı hâle getirilir, ardından adanmış bir süreç tarafından iletilir.

OutboxEvent.javajava
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String eventId;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String eventType;

    @Column(columnDefinition = "TEXT", nullable = false)
    private String payload;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private OutboxStatus status;

    @Column(nullable = false)
    private Instant createdAt;

    private Instant publishedAt;
}

public enum OutboxStatus {
    PENDING, PUBLISHED, FAILED
}
OrderService.javajava
@Service
@Transactional
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    public OrderService(OrderRepository orderRepository,
                       OutboxRepository outboxRepository,
                       ObjectMapper objectMapper) {
        this.orderRepository = orderRepository;
        this.outboxRepository = outboxRepository;
        this.objectMapper = objectMapper;
    }

    public Order createOrder(CreateOrderRequest request) {
        // Sipariş oluşturma
        Order order = Order.builder()
            .customerId(request.customerId())
            .items(request.items())
            .totalAmount(calculateTotal(request.items()))
            .status(OrderStatus.CREATED)
            .createdAt(Instant.now())
            .build();

        Order savedOrder = orderRepository.save(order);

        // Aynı transaction içinde outbox olayı oluşturma
        OrderEvent event = OrderEvent.created(toPayload(savedOrder));

        OutboxEvent outboxEvent = OutboxEvent.builder()
            .eventId(event.eventId())
            .aggregateType("Order")
            .aggregateId(savedOrder.getId().toString())
            .eventType(event.eventType())
            .payload(serialize(event))
            .status(OutboxStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        outboxRepository.save(outboxEvent);

        return savedOrder;
    }

    private String serialize(OrderEvent event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Failed to serialize event", e);
        }
    }
}

Outbox relayer olayları Kafka'ya asenkron olarak yayınlar:

OutboxRelayScheduler.javajava
@Component
@Slf4j
public class OutboxRelayScheduler {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OutboxRelayScheduler(OutboxRepository outboxRepository,
                               KafkaTemplate<String, String> kafkaTemplate) {
        this.outboxRepository = outboxRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void relayPendingEvents() {
        // Bekleyen olayları kilitle birlikte alma
        List<OutboxEvent> pendingEvents = outboxRepository
            .findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);

        for (OutboxEvent event : pendingEvents) {
            try {
                // Kafka'ya yayınlama
                kafkaTemplate.send(
                    ordersTopic,
                    event.getAggregateId(),
                    event.getPayload()
                ).get(5, TimeUnit.SECONDS);

                // Durumu güncelleme
                event.setStatus(OutboxStatus.PUBLISHED);
                event.setPublishedAt(Instant.now());
                outboxRepository.save(event);

                log.debug("Outbox event published: {}", event.getEventId());

            } catch (Exception ex) {
                log.error("Failed to publish outbox event: {}",
                    event.getEventId(), ex);
                event.setStatus(OutboxStatus.FAILED);
                outboxRepository.save(event);
            }
        }
    }
}

Bu kalıp, bir olayın yalnızca iş transaction'ı başarılı olduğunda yayınlanmasını güvence altına alır ve veritabanı ile Kafka arasındaki tutarsızlıkları ortadan kaldırır.

Pratik yapmaya başla!

Mülakat simülatörleri ve teknik testlerle bilgini test et.

İzleme ve gözlemlenebilirlik

Bir Kafka sistemini denetlemek için tüketim gecikmesi, lag ve hatalara ilişkin metrikler gerekir. Spring Boot Actuator bu metrikleri Micrometer aracılığıyla sunar.

KafkaMetricsConfig.javajava
@Configuration
public class KafkaMetricsConfig {

    @Bean
    public MeterBinder kafkaConsumerMetrics(
            ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        return registry -> {
            // Özel metrik: işlenen olaylar
            Counter.builder("kafka.consumer.events.processed")
                .description("Number of events successfully processed")
                .tag("topic", "orders")
                .register(registry);

            // Özel metrik: başarısız olaylar
            Counter.builder("kafka.consumer.events.failed")
                .description("Number of events that failed processing")
                .tag("topic", "orders")
                .register(registry);
        };
    }
}
InstrumentedOrderConsumer.javajava
@Service
@Slf4j
public class InstrumentedOrderConsumer {

    private final OrderProcessingService processingService;
    private final MeterRegistry meterRegistry;
    private final Counter processedCounter;
    private final Counter failedCounter;
    private final Timer processingTimer;

    public InstrumentedOrderConsumer(OrderProcessingService processingService,
                                     MeterRegistry meterRegistry) {
        this.processingService = processingService;
        this.meterRegistry = meterRegistry;

        // Sayaçları başlatma
        this.processedCounter = Counter.builder("kafka.consumer.events.processed")
            .tag("topic", "orders")
            .register(meterRegistry);

        this.failedCounter = Counter.builder("kafka.consumer.events.failed")
            .tag("topic", "orders")
            .register(meterRegistry);

        // İşlem gecikmesini ölçen timer
        this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
            .tag("topic", "orders")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(meterRegistry);
    }

    @KafkaListener(topics = "${app.kafka.topics.orders}")
    public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {

        Timer.Sample sample = Timer.start(meterRegistry);

        try {
            processingService.process(event);
            acknowledgment.acknowledge();

            processedCounter.increment();
            sample.stop(processingTimer);

        } catch (Exception ex) {
            failedCounter.increment();
            sample.stop(processingTimer);
            throw ex;
        }
    }
}

Bu metrikler, tüketici lag'i, hata oranı ve işlem gecikmesi üzerinde uyarılar yapılandırmaya olanak tanır ve sorunların proaktif olarak tespit edilmesi için kritik öneme sahiptir.

Sonuç

Spring Kafka, dayanıklı event-driven mimariler kurmak için sağlam bir entegrasyon sunar. Yeniden deneme mekanizmalarına, dead letter queue'lara ve idempotansa hâkim olmak üretime hazır uygulamaların temelini oluşturur.

Spring Kafka ile event-driven mimari kontrol listesi:

  • AckMode.MANUAL ile manuel offset commit'i yapılandır
  • ✅ Olay sırasını korumak için tutarlı bir partition anahtarı kullan
  • DefaultErrorHandler aracılığıyla üstel backoff ile yeniden denemeler uygula
  • ✅ Başarısız mesajları bir Dead Letter Queue'a yönlendir
  • ✅ eventId takibi ile tüketici tarafında idempotansı garanti et
  • ✅ Veritabanı/Kafka tutarlılığı için Outbox kalıbını değerlendir
  • ✅ İzleme için Micrometer ile metrikleri sun
  • MAX_POLL_INTERVAL_MS değerini maksimum işlem süresine göre ayarla

Etiketler

#spring kafka
#event-driven
#kafka consumer
#microservices
#resilience

Paylaş

İlgili makaleler