Spring Kafka: ereignisgesteuerte Architektur mit resilienten Consumern

Vollständiger Spring-Kafka-Leitfaden für ereignisgesteuerte Architekturen. Konfiguration, resiliente Consumer, Retry-Strategien, Dead Letter Queues und Produktionsmuster für verteilte Anwendungen.

Ereignisgesteuerte Architektur mit Spring Kafka und resilienten Consumern

Apache Kafka hat sich zum De-facto-Standard für ereignisgesteuerte Architekturen im großen Maßstab entwickelt. Spring Kafka vereinfacht dessen Integration in Spring-Boot-Anwendungen und bietet zugleich essenzielle Resilienzmechanismen für Produktionsumgebungen. Dieser Leitfaden beleuchtet ausführlich Konfiguration, Konsummuster und Strategien zur Fehlerbehandlung.

Voraussetzungen

Dieser Leitfaden setzt Vertrautheit mit den Kafka-Grundkonzepten voraus: Topics, Partitionen, Consumer Groups und Offsets. Der Fokus liegt auf der Spring-Integration und den Resilienzmustern.

Warum eine ereignisgesteuerte Architektur wählen?

Ereignisgesteuerte Architekturen entkoppeln Systemkomponenten über asynchrone Ereignisse. Im Gegensatz zu synchronen REST-Aufrufen senden Producer Ereignisse, ohne auf eine Antwort zu warten, sodass Consumer im eigenen Tempo verarbeiten können.

Dieser Ansatz bringt mehrere zentrale Vorteile: unabhängige horizontale Skalierbarkeit pro Service, höhere Resilienz gegenüber temporären Ausfällen sowie vollständige Nachvollziehbarkeit über das unveränderliche Kafka-Log.

OrderEvent.javajava
public record OrderEvent(
    // Eindeutige Ereignis-ID für Idempotenz
    String eventId,
    // Ereignistyp für das Routing
    String eventType,
    // Erstellungszeitstempel
    Instant createdAt,
    // Fachliche Payload
    OrderPayload payload
) {
    // Factory-Methode garantiert die Eindeutigkeit der eventId
    public static OrderEvent created(OrderPayload payload) {
        return new OrderEvent(
            UUID.randomUUID().toString(),
            "ORDER_CREATED",
            Instant.now(),
            payload
        );
    }
}

public record OrderPayload(
    Long orderId,
    Long customerId,
    List<OrderItem> items,
    BigDecimal totalAmount
) {}

Die Ereignisstruktur enthält systematisch eine eindeutige ID, einen Typ und einen Zeitstempel. Diese Metadaten ermöglichen die Filterung auf Consumer-Seite, die Erkennung von Duplikaten und ein zeitliches Tracking.

Grundlegende Spring-Kafka-Konfiguration

Die Integration startet mit dem spring-kafka-Starter und einer minimalen YAML-Konfiguration. Spring Boot konfiguriert die wesentlichen Beans automatisch: KafkaTemplate zum Produzieren und ConcurrentKafkaListenerContainerFactory zum Konsumieren.

yaml
# application.yml
spring:
  kafka:
    # Kafka-Broker-Adressen (Cluster)
    bootstrap-servers: localhost:9092

    # Producer-Konfiguration
    producer:
      # String-Key-Serialisierung
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # JSON-Value-Serialisierung
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # Auf Bestätigung aller Replikate warten
      acks: all
      # Anzahl der Wiederholungen bei Netzwerkfehlern
      retries: 3

    # Consumer-Konfiguration
    consumer:
      # ID der Consumer Group
      group-id: order-service
      # Key-Deserialisierung
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # JSON-Deserialisierung mit Zieltyp
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # Startposition, falls kein Offset vorhanden
      auto-offset-reset: earliest
      # Auto-Commit deaktivieren für manuelle Kontrolle
      enable-auto-commit: false
      properties:
        # Vertrauenswürdiges Paket für die Deserialisierung
        spring.json.trusted.packages: com.example.events

Die Deaktivierung von enable-auto-commit ist eine essenzielle Praxis für die Produktion. Manuelle Offset-Commits stellen sicher, dass eine Nachricht erst nach erfolgreichem Abschluss der Verarbeitung als verarbeitet markiert wird.

Einen Kafka-Producer erstellen

Das KafkaTemplate kapselt die Logik zum Senden an Kafka. Die direkte Injektion ermöglicht die unmittelbare Verwendung in fachlichen Services.

OrderEventPublisher.javajava
@Service
@Slf4j
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    // Ziel-Topic (extern in der Konfiguration)
    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
            OrderPayload payload) {

        // Erzeugung des Ereignisses mit Metadaten
        OrderEvent event = OrderEvent.created(payload);

        // orderId als Partitionsschlüssel verwenden
        // Garantiert die Reihenfolge der Ereignisse zur selben Bestellung
        String partitionKey = payload.orderId().toString();

        log.info("Publishing ORDER_CREATED event: {} to topic: {}",
            event.eventId(), ordersTopic);

        // Asynchrones Senden mit Callback
        return kafkaTemplate.send(ordersTopic, partitionKey, event)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    // Erfolg: Sende-Metadaten loggen
                    RecordMetadata metadata = result.getRecordMetadata();
                    log.info("Event sent successfully: topic={}, partition={}, offset={}",
                        metadata.topic(), metadata.partition(), metadata.offset());
                } else {
                    // Fehler: Fehler zur Untersuchung loggen
                    log.error("Failed to send event: {}", event.eventId(), ex);
                }
            });
    }
}

Die Verwendung eines Partitionsschlüssels auf Basis der fachlichen ID stellt sicher, dass alle Ereignisse derselben Entität in derselben Partition landen und dadurch ihre chronologische Reihenfolge erhalten bleibt.

Partitionsschlüssel

Ein Null-Schlüssel verteilt Nachrichten per Round-Robin auf die Partitionen. Das maximiert die Parallelität, opfert aber die Reihenfolgegarantien. Die Schlüsselwahl hängt von den fachlichen Anforderungen ab.

Einfacher Consumer mit @KafkaListener

Die Annotation @KafkaListener macht aus einer Methode einen Kafka-Consumer. Spring übernimmt automatisch die Polling-Schleife, die Deserialisierung und den Offset-Commit.

OrderEventConsumer.javajava
@Service
@Slf4j
public class OrderEventConsumer {

    private final OrderProcessingService processingService;

    public OrderEventConsumer(OrderProcessingService processingService) {
        this.processingService = processingService;
    }

    @KafkaListener(
        // Topic(s), die abgehört werden
        topics = "${app.kafka.topics.orders}",
        // Consumer Group
        groupId = "${spring.kafka.consumer.group-id}",
        // Eigene Factory für erweiterte Konfiguration
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            // Automatisch deserialisierte Payload
            OrderEvent event,
            // Eingespritzte Kafka-Metadaten
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            // Acknowledgment für manuelles Commit
            Acknowledgment acknowledgment) {

        log.info("Received event: type={}, partition={}, offset={}",
            event.eventType(), partition, offset);

        try {
            // Fachliche Verarbeitung
            processingService.process(event);

            // Offset erst nach erfolgreicher Verarbeitung committen
            acknowledgment.acknowledge();

            log.info("Event processed successfully: {}", event.eventId());

        } catch (Exception ex) {
            // Fehlendes acknowledge() führt zur erneuten Verarbeitung
            log.error("Failed to process event: {}", event.eventId(), ex);
            throw ex;
        }
    }
}

Das Einspritzen von Acknowledgment erlaubt eine explizite Steuerung des Commits. Ohne den Aufruf von acknowledge() bleibt der Offset uncommitted und die Nachricht wird beim nächsten Poll erneut zugestellt.

Erweiterte ConsumerFactory-Konfiguration

Die Standardkonfiguration ist für die Entwicklung geeignet, erfordert aber Anpassungen für die Produktion. Eine eigene Factory bietet eine feingranulare Steuerung des Consumer-Verhaltens.

KafkaConsumerConfig.javajava
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        // Verbindungs-Konfiguration
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // Deserialisierung
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);

        // Offset-Verwaltung
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // Performance: Records pro Poll
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        // Session-Timeout zur Fehlererkennung
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        // Heartbeat-Intervall (1/3 des Session-Timeouts)
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

        // Maximale Verarbeitungszeit vor Rebalance
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

        // JSON-Deserializer-Konfiguration
        JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
        deserializer.addTrustedPackages("com.example.events");
        deserializer.setUseTypeMapperForKey(false);

        return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        // Manueller Acknowledgment-Modus
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Anzahl der Consumer-Threads
        factory.setConcurrency(3);

        // Batch-Verarbeitung deaktiviert (eine Nachricht pro Mal)
        factory.setBatchListener(false);

        return factory;
    }
}

Der Parameter MAX_POLL_INTERVAL_MS_CONFIG definiert die maximale Verzögerung zwischen poll()-Aufrufen. Eine Überschreitung führt zum Ausschluss des Consumers aus der Gruppe und zu einem Rebalance. Der Wert sollte die maximal erwartete Verarbeitungszeit widerspiegeln.

Bereit für deine Spring Boot-Interviews?

Übe mit unseren interaktiven Simulatoren, Flashcards und technischen Tests.

Retry-Strategien mit RetryTemplate

Vorübergehende Fehler (kurzzeitige Nichterreichbarkeit eines Service, Netzwerk-Timeouts) erfordern automatische Wiederholungen. Spring Kafka integriert sich mit RetryTemplate, um anspruchsvolle Retry-Policies umzusetzen.

KafkaRetryConfig.javajava
@Configuration
public class KafkaRetryConfig {

    @Bean
    public RetryTemplate kafkaRetryTemplate() {
        RetryTemplate template = new RetryTemplate();

        // Retry-Policy: max. 3 Versuche
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        template.setRetryPolicy(retryPolicy);

        // Exponentielles Backoff: 1s, 2s, 4s zwischen den Versuchen
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        template.setBackOffPolicy(backOffPolicy);

        return template;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            retryableKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                RetryTemplate kafkaRetryTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // Retry-Konfiguration mit Recovery-Callback
        factory.setCommonErrorHandler(
            new DefaultErrorHandler(
                // Recovery: Aktion nach Erschöpfung der Versuche
                (record, exception) -> {
                    log.error("All retries exhausted for record: key={}, value={}",
                        record.key(), record.value(), exception);
                },
                // Exponentielles Backoff: 1s initial, max. 30s, 3 Versuche
                new ExponentialBackOff(1000L, 2.0)
            )
        );

        return factory;
    }
}

Der DefaultErrorHandler ersetzt seit Spring Kafka 2.8 den älteren SeekToCurrentErrorHandler. Er bietet eine klarere API und erweiterte Konfigurationsmöglichkeiten.

Eine Dead Letter Queue umsetzen

Nach Erschöpfung der Versuche sollten fehlgeschlagene Nachrichten in eine Dead Letter Queue (DLQ) für eine spätere Analyse umgeleitet werden. Dieser Ansatz verhindert Datenverluste und entsperrt zugleich den Consumer.

DeadLetterConfig.javajava
@Configuration
public class DeadLetterConfig {

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // Namensschema des DLT-Topics: original-topic.DLT
        return new DeadLetterPublishingRecoverer(
            kafkaTemplate,
            (record, exception) -> {
                // DLT-Topic basierend auf dem Quell-Topic
                String dltTopic = record.topic() + ".DLT";

                log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
                    dltTopic, record.key(), exception.getMessage());

                return new TopicPartition(dltTopic, record.partition());
            }
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            dltKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // ErrorHandler mit DLT-Recovery
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            deadLetterPublishingRecoverer,
            new FixedBackOff(1000L, 3)
        );

        // Nicht wiederholbare Ausnahmen (direkter DLT-Versand)
        errorHandler.addNotRetryableExceptions(
            ValidationException.class,
            JsonParseException.class,
            NullPointerException.class
        );

        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }
}

Die Unterscheidung zwischen wiederholbaren und nicht wiederholbaren Ausnahmen optimiert das Verhalten. Eine ValidationException weist auf fehlerhaft formatierte Daten hin, die durch erneute Versuche nicht behoben werden, was den direkten DLT-Versand rechtfertigt.

DLT-Consumer für manuelles Reprocessing

Ein dedizierter Consumer überwacht die DLT und erlaubt das Reprocessing von Nachrichten, sobald das zugrundeliegende Problem behoben ist.

DeadLetterConsumer.javajava
@Service
@Slf4j
public class DeadLetterConsumer {

    private final AlertingService alertingService;
    private final FailedEventRepository failedEventRepository;

    public DeadLetterConsumer(AlertingService alertingService,
                              FailedEventRepository failedEventRepository) {
        this.alertingService = alertingService;
        this.failedEventRepository = failedEventRepository;
    }

    @KafkaListener(
        topics = "${app.kafka.topics.orders}.DLT",
        groupId = "order-service-dlt"
    )
    public void handleDeadLetter(
            ConsumerRecord<String, OrderEvent> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
            @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
            @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
            Acknowledgment acknowledgment) {

        log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
            originalTopic, originalPartition, originalOffset, exceptionMessage);

        // Persistenz für Analyse und späteres Reprocessing
        FailedEvent failedEvent = FailedEvent.builder()
            .eventId(record.value().eventId())
            .eventType(record.value().eventType())
            .payload(serializePayload(record.value()))
            .originalTopic(originalTopic)
            .originalPartition(originalPartition)
            .originalOffset(originalOffset)
            .exceptionMessage(exceptionMessage)
            .stacktrace(stacktrace)
            .status(FailedEventStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        failedEventRepository.save(failedEvent);

        // Alarm zur menschlichen Intervention
        alertingService.notifyDeadLetter(failedEvent);

        acknowledgment.acknowledge();
    }
}
DLT-Header

Spring Kafka reichert DLT-Nachrichten automatisch mit Headern an, die Fehler-Metadaten enthalten: Exception, Stacktrace, ursprüngliches Topic, Partition und Offset. Diese Informationen erleichtern die Diagnose.

Idempotenzbehandlung auf Consumer-Seite

Kafka garantiert eine "at least once"-Zustellung: Eine Nachricht kann mehrfach zugestellt werden, wenn ein Crash nach der Verarbeitung, aber vor dem Commit eintritt. Idempotenz auf Consumer-Seite verhindert Seiteneffekte beim erneuten Verarbeiten.

IdempotentOrderProcessor.javajava
@Service
@Slf4j
public class IdempotentOrderProcessor {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderService orderService;

    public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
                                    OrderService orderService) {
        this.processedEventRepository = processedEventRepository;
        this.orderService = orderService;
    }

    @Transactional
    public void processIdempotent(OrderEvent event) {
        String eventId = event.eventId();

        // Prüfung: Ereignis bereits verarbeitet?
        if (processedEventRepository.existsByEventId(eventId)) {
            log.info("Event already processed, skipping: {}", eventId);
            return;
        }

        // Fachliche Verarbeitung
        switch (event.eventType()) {
            case "ORDER_CREATED" -> orderService.createOrder(event.payload());
            case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
            case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
            default -> log.warn("Unknown event type: {}", event.eventType());
        }

        // Verarbeitung in derselben Transaktion festhalten
        ProcessedEvent processed = ProcessedEvent.builder()
            .eventId(eventId)
            .eventType(event.eventType())
            .processedAt(Instant.now())
            .build();

        processedEventRepository.save(processed);

        log.info("Event processed and recorded: {}", eventId);
    }
}
ProcessedEventRepository.javajava
@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {

    // Index auf eventId für optimale Performance
    boolean existsByEventId(String eventId);

    // Bereinigung alter Einträge
    @Modifying
    @Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
    int deleteOlderThan(@Param("before") Instant before);
}

Die Erfassung der Verarbeitung in derselben Transaktion wie die fachliche Logik garantiert Atomizität. Ein Crash nach der Verarbeitung, aber vor dem Commit, spielt beide Operationen konsistent erneut ab.

Transactional-Outbox-Muster

Das Outbox-Muster löst Konsistenzprobleme zwischen Datenbank und Kafka. Anstatt direkt nach Kafka zu publizieren, werden Ereignisse zunächst in einer "Outbox"-Tabelle persistiert und anschließend von einem dedizierten Prozess weitergeleitet.

OutboxEvent.javajava
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String eventId;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String eventType;

    @Column(columnDefinition = "TEXT", nullable = false)
    private String payload;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private OutboxStatus status;

    @Column(nullable = false)
    private Instant createdAt;

    private Instant publishedAt;
}

public enum OutboxStatus {
    PENDING, PUBLISHED, FAILED
}
OrderService.javajava
@Service
@Transactional
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    public OrderService(OrderRepository orderRepository,
                       OutboxRepository outboxRepository,
                       ObjectMapper objectMapper) {
        this.orderRepository = orderRepository;
        this.outboxRepository = outboxRepository;
        this.objectMapper = objectMapper;
    }

    public Order createOrder(CreateOrderRequest request) {
        // Bestellung anlegen
        Order order = Order.builder()
            .customerId(request.customerId())
            .items(request.items())
            .totalAmount(calculateTotal(request.items()))
            .status(OrderStatus.CREATED)
            .createdAt(Instant.now())
            .build();

        Order savedOrder = orderRepository.save(order);

        // Outbox-Ereignis in derselben Transaktion erzeugen
        OrderEvent event = OrderEvent.created(toPayload(savedOrder));

        OutboxEvent outboxEvent = OutboxEvent.builder()
            .eventId(event.eventId())
            .aggregateType("Order")
            .aggregateId(savedOrder.getId().toString())
            .eventType(event.eventType())
            .payload(serialize(event))
            .status(OutboxStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        outboxRepository.save(outboxEvent);

        return savedOrder;
    }

    private String serialize(OrderEvent event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Failed to serialize event", e);
        }
    }
}

Der Outbox-Relayer publiziert die Ereignisse asynchron in Kafka:

OutboxRelayScheduler.javajava
@Component
@Slf4j
public class OutboxRelayScheduler {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OutboxRelayScheduler(OutboxRepository outboxRepository,
                               KafkaTemplate<String, String> kafkaTemplate) {
        this.outboxRepository = outboxRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void relayPendingEvents() {
        // Offene Ereignisse mit Lock abrufen
        List<OutboxEvent> pendingEvents = outboxRepository
            .findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);

        for (OutboxEvent event : pendingEvents) {
            try {
                // In Kafka publizieren
                kafkaTemplate.send(
                    ordersTopic,
                    event.getAggregateId(),
                    event.getPayload()
                ).get(5, TimeUnit.SECONDS);

                // Status aktualisieren
                event.setStatus(OutboxStatus.PUBLISHED);
                event.setPublishedAt(Instant.now());
                outboxRepository.save(event);

                log.debug("Outbox event published: {}", event.getEventId());

            } catch (Exception ex) {
                log.error("Failed to publish outbox event: {}",
                    event.getEventId(), ex);
                event.setStatus(OutboxStatus.FAILED);
                outboxRepository.save(event);
            }
        }
    }
}

Dieses Muster stellt sicher, dass ein Ereignis nur publiziert wird, wenn die fachliche Transaktion erfolgreich war, und beseitigt Inkonsistenzen zwischen Datenbank und Kafka.

Fang an zu üben!

Teste dein Wissen mit unseren Interview-Simulatoren und technischen Tests.

Monitoring und Observability

Das Überwachen eines Kafka-Systems erfordert Metriken zu Konsumlatenz, Lag und Fehlern. Spring Boot Actuator stellt diese Metriken via Micrometer bereit.

KafkaMetricsConfig.javajava
@Configuration
public class KafkaMetricsConfig {

    @Bean
    public MeterBinder kafkaConsumerMetrics(
            ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        return registry -> {
            // Eigene Metrik: verarbeitete Ereignisse
            Counter.builder("kafka.consumer.events.processed")
                .description("Number of events successfully processed")
                .tag("topic", "orders")
                .register(registry);

            // Eigene Metrik: fehlgeschlagene Ereignisse
            Counter.builder("kafka.consumer.events.failed")
                .description("Number of events that failed processing")
                .tag("topic", "orders")
                .register(registry);
        };
    }
}
InstrumentedOrderConsumer.javajava
@Service
@Slf4j
public class InstrumentedOrderConsumer {

    private final OrderProcessingService processingService;
    private final MeterRegistry meterRegistry;
    private final Counter processedCounter;
    private final Counter failedCounter;
    private final Timer processingTimer;

    public InstrumentedOrderConsumer(OrderProcessingService processingService,
                                     MeterRegistry meterRegistry) {
        this.processingService = processingService;
        this.meterRegistry = meterRegistry;

        // Counter initialisieren
        this.processedCounter = Counter.builder("kafka.consumer.events.processed")
            .tag("topic", "orders")
            .register(meterRegistry);

        this.failedCounter = Counter.builder("kafka.consumer.events.failed")
            .tag("topic", "orders")
            .register(meterRegistry);

        // Timer zur Messung der Verarbeitungslatenz
        this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
            .tag("topic", "orders")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(meterRegistry);
    }

    @KafkaListener(topics = "${app.kafka.topics.orders}")
    public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {

        Timer.Sample sample = Timer.start(meterRegistry);

        try {
            processingService.process(event);
            acknowledgment.acknowledge();

            processedCounter.increment();
            sample.stop(processingTimer);

        } catch (Exception ex) {
            failedCounter.increment();
            sample.stop(processingTimer);
            throw ex;
        }
    }
}

Diese Metriken erlauben das Konfigurieren von Alarmen zu Consumer-Lag, Fehlerrate und Verarbeitungslatenz und sind essenziell für eine proaktive Problemerkennung.

Fazit

Spring Kafka liefert eine robuste Integration für den Aufbau resilienter, ereignisgesteuerter Architekturen. Die Beherrschung von Retry-Mechanismen, Dead Letter Queues und Idempotenz bildet das Fundament produktionsreifer Anwendungen.

Checkliste für ereignisgesteuerte Architekturen mit Spring Kafka:

  • ✅ Manuelle Offset-Commits mit AckMode.MANUAL konfigurieren
  • ✅ Konsistenten Partitionsschlüssel verwenden, um die Ereignisreihenfolge zu wahren
  • ✅ Wiederholungen mit exponentiellem Backoff über DefaultErrorHandler implementieren
  • ✅ Fehlgeschlagene Nachrichten in eine Dead Letter Queue umleiten
  • ✅ Idempotenz auf Consumer-Seite per eventId-Tracking sicherstellen
  • ✅ Outbox-Muster für Konsistenz zwischen Datenbank und Kafka erwägen
  • ✅ Metriken zur Überwachung via Micrometer bereitstellen
  • MAX_POLL_INTERVAL_MS an die maximale Verarbeitungszeit anpassen

Tags

#spring kafka
#event-driven
#kafka consumer
#microservices
#resilience

Teilen

Verwandte Artikel