Spring Kafka: ereignisgesteuerte Architektur mit resilienten Consumern
Vollständiger Spring-Kafka-Leitfaden für ereignisgesteuerte Architekturen. Konfiguration, resiliente Consumer, Retry-Strategien, Dead Letter Queues und Produktionsmuster für verteilte Anwendungen.

Apache Kafka hat sich zum De-facto-Standard für ereignisgesteuerte Architekturen im großen Maßstab entwickelt. Spring Kafka vereinfacht dessen Integration in Spring-Boot-Anwendungen und bietet zugleich essenzielle Resilienzmechanismen für Produktionsumgebungen. Dieser Leitfaden beleuchtet ausführlich Konfiguration, Konsummuster und Strategien zur Fehlerbehandlung.
Dieser Leitfaden setzt Vertrautheit mit den Kafka-Grundkonzepten voraus: Topics, Partitionen, Consumer Groups und Offsets. Der Fokus liegt auf der Spring-Integration und den Resilienzmustern.
Warum eine ereignisgesteuerte Architektur wählen?
Ereignisgesteuerte Architekturen entkoppeln Systemkomponenten über asynchrone Ereignisse. Im Gegensatz zu synchronen REST-Aufrufen senden Producer Ereignisse, ohne auf eine Antwort zu warten, sodass Consumer im eigenen Tempo verarbeiten können.
Dieser Ansatz bringt mehrere zentrale Vorteile: unabhängige horizontale Skalierbarkeit pro Service, höhere Resilienz gegenüber temporären Ausfällen sowie vollständige Nachvollziehbarkeit über das unveränderliche Kafka-Log.
public record OrderEvent(
// Eindeutige Ereignis-ID für Idempotenz
String eventId,
// Ereignistyp für das Routing
String eventType,
// Erstellungszeitstempel
Instant createdAt,
// Fachliche Payload
OrderPayload payload
) {
// Factory-Methode garantiert die Eindeutigkeit der eventId
public static OrderEvent created(OrderPayload payload) {
return new OrderEvent(
UUID.randomUUID().toString(),
"ORDER_CREATED",
Instant.now(),
payload
);
}
}
public record OrderPayload(
Long orderId,
Long customerId,
List<OrderItem> items,
BigDecimal totalAmount
) {}Die Ereignisstruktur enthält systematisch eine eindeutige ID, einen Typ und einen Zeitstempel. Diese Metadaten ermöglichen die Filterung auf Consumer-Seite, die Erkennung von Duplikaten und ein zeitliches Tracking.
Grundlegende Spring-Kafka-Konfiguration
Die Integration startet mit dem spring-kafka-Starter und einer minimalen YAML-Konfiguration. Spring Boot konfiguriert die wesentlichen Beans automatisch: KafkaTemplate zum Produzieren und ConcurrentKafkaListenerContainerFactory zum Konsumieren.
# application.yml
spring:
kafka:
# Kafka-Broker-Adressen (Cluster)
bootstrap-servers: localhost:9092
# Producer-Konfiguration
producer:
# String-Key-Serialisierung
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# JSON-Value-Serialisierung
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# Auf Bestätigung aller Replikate warten
acks: all
# Anzahl der Wiederholungen bei Netzwerkfehlern
retries: 3
# Consumer-Konfiguration
consumer:
# ID der Consumer Group
group-id: order-service
# Key-Deserialisierung
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# JSON-Deserialisierung mit Zieltyp
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# Startposition, falls kein Offset vorhanden
auto-offset-reset: earliest
# Auto-Commit deaktivieren für manuelle Kontrolle
enable-auto-commit: false
properties:
# Vertrauenswürdiges Paket für die Deserialisierung
spring.json.trusted.packages: com.example.eventsDie Deaktivierung von enable-auto-commit ist eine essenzielle Praxis für die Produktion. Manuelle Offset-Commits stellen sicher, dass eine Nachricht erst nach erfolgreichem Abschluss der Verarbeitung als verarbeitet markiert wird.
Einen Kafka-Producer erstellen
Das KafkaTemplate kapselt die Logik zum Senden an Kafka. Die direkte Injektion ermöglicht die unmittelbare Verwendung in fachlichen Services.
@Service
@Slf4j
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
// Ziel-Topic (extern in der Konfiguration)
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
OrderPayload payload) {
// Erzeugung des Ereignisses mit Metadaten
OrderEvent event = OrderEvent.created(payload);
// orderId als Partitionsschlüssel verwenden
// Garantiert die Reihenfolge der Ereignisse zur selben Bestellung
String partitionKey = payload.orderId().toString();
log.info("Publishing ORDER_CREATED event: {} to topic: {}",
event.eventId(), ordersTopic);
// Asynchrones Senden mit Callback
return kafkaTemplate.send(ordersTopic, partitionKey, event)
.whenComplete((result, ex) -> {
if (ex == null) {
// Erfolg: Sende-Metadaten loggen
RecordMetadata metadata = result.getRecordMetadata();
log.info("Event sent successfully: topic={}, partition={}, offset={}",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
// Fehler: Fehler zur Untersuchung loggen
log.error("Failed to send event: {}", event.eventId(), ex);
}
});
}
}Die Verwendung eines Partitionsschlüssels auf Basis der fachlichen ID stellt sicher, dass alle Ereignisse derselben Entität in derselben Partition landen und dadurch ihre chronologische Reihenfolge erhalten bleibt.
Ein Null-Schlüssel verteilt Nachrichten per Round-Robin auf die Partitionen. Das maximiert die Parallelität, opfert aber die Reihenfolgegarantien. Die Schlüsselwahl hängt von den fachlichen Anforderungen ab.
Einfacher Consumer mit @KafkaListener
Die Annotation @KafkaListener macht aus einer Methode einen Kafka-Consumer. Spring übernimmt automatisch die Polling-Schleife, die Deserialisierung und den Offset-Commit.
@Service
@Slf4j
public class OrderEventConsumer {
private final OrderProcessingService processingService;
public OrderEventConsumer(OrderProcessingService processingService) {
this.processingService = processingService;
}
@KafkaListener(
// Topic(s), die abgehört werden
topics = "${app.kafka.topics.orders}",
// Consumer Group
groupId = "${spring.kafka.consumer.group-id}",
// Eigene Factory für erweiterte Konfiguration
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
// Automatisch deserialisierte Payload
OrderEvent event,
// Eingespritzte Kafka-Metadaten
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
// Acknowledgment für manuelles Commit
Acknowledgment acknowledgment) {
log.info("Received event: type={}, partition={}, offset={}",
event.eventType(), partition, offset);
try {
// Fachliche Verarbeitung
processingService.process(event);
// Offset erst nach erfolgreicher Verarbeitung committen
acknowledgment.acknowledge();
log.info("Event processed successfully: {}", event.eventId());
} catch (Exception ex) {
// Fehlendes acknowledge() führt zur erneuten Verarbeitung
log.error("Failed to process event: {}", event.eventId(), ex);
throw ex;
}
}
}Das Einspritzen von Acknowledgment erlaubt eine explizite Steuerung des Commits. Ohne den Aufruf von acknowledge() bleibt der Offset uncommitted und die Nachricht wird beim nächsten Poll erneut zugestellt.
Erweiterte ConsumerFactory-Konfiguration
Die Standardkonfiguration ist für die Entwicklung geeignet, erfordert aber Anpassungen für die Produktion. Eine eigene Factory bietet eine feingranulare Steuerung des Consumer-Verhaltens.
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, OrderEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
// Verbindungs-Konfiguration
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// Deserialisierung
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
// Offset-Verwaltung
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Performance: Records pro Poll
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// Session-Timeout zur Fehlererkennung
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// Heartbeat-Intervall (1/3 des Session-Timeouts)
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
// Maximale Verarbeitungszeit vor Rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// JSON-Deserializer-Konfiguration
JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
deserializer.addTrustedPackages("com.example.events");
deserializer.setUseTypeMapperForKey(false);
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
deserializer
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Manueller Acknowledgment-Modus
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// Anzahl der Consumer-Threads
factory.setConcurrency(3);
// Batch-Verarbeitung deaktiviert (eine Nachricht pro Mal)
factory.setBatchListener(false);
return factory;
}
}Der Parameter MAX_POLL_INTERVAL_MS_CONFIG definiert die maximale Verzögerung zwischen poll()-Aufrufen. Eine Überschreitung führt zum Ausschluss des Consumers aus der Gruppe und zu einem Rebalance. Der Wert sollte die maximal erwartete Verarbeitungszeit widerspiegeln.
Bereit für deine Spring Boot-Interviews?
Übe mit unseren interaktiven Simulatoren, Flashcards und technischen Tests.
Retry-Strategien mit RetryTemplate
Vorübergehende Fehler (kurzzeitige Nichterreichbarkeit eines Service, Netzwerk-Timeouts) erfordern automatische Wiederholungen. Spring Kafka integriert sich mit RetryTemplate, um anspruchsvolle Retry-Policies umzusetzen.
@Configuration
public class KafkaRetryConfig {
@Bean
public RetryTemplate kafkaRetryTemplate() {
RetryTemplate template = new RetryTemplate();
// Retry-Policy: max. 3 Versuche
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
template.setRetryPolicy(retryPolicy);
// Exponentielles Backoff: 1s, 2s, 4s zwischen den Versuchen
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
template.setBackOffPolicy(backOffPolicy);
return template;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
retryableKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
RetryTemplate kafkaRetryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// Retry-Konfiguration mit Recovery-Callback
factory.setCommonErrorHandler(
new DefaultErrorHandler(
// Recovery: Aktion nach Erschöpfung der Versuche
(record, exception) -> {
log.error("All retries exhausted for record: key={}, value={}",
record.key(), record.value(), exception);
},
// Exponentielles Backoff: 1s initial, max. 30s, 3 Versuche
new ExponentialBackOff(1000L, 2.0)
)
);
return factory;
}
}Der DefaultErrorHandler ersetzt seit Spring Kafka 2.8 den älteren SeekToCurrentErrorHandler. Er bietet eine klarere API und erweiterte Konfigurationsmöglichkeiten.
Eine Dead Letter Queue umsetzen
Nach Erschöpfung der Versuche sollten fehlgeschlagene Nachrichten in eine Dead Letter Queue (DLQ) für eine spätere Analyse umgeleitet werden. Dieser Ansatz verhindert Datenverluste und entsperrt zugleich den Consumer.
@Configuration
public class DeadLetterConfig {
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
KafkaTemplate<String, Object> kafkaTemplate) {
// Namensschema des DLT-Topics: original-topic.DLT
return new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, exception) -> {
// DLT-Topic basierend auf dem Quell-Topic
String dltTopic = record.topic() + ".DLT";
log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
dltTopic, record.key(), exception.getMessage());
return new TopicPartition(dltTopic, record.partition());
}
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
dltKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// ErrorHandler mit DLT-Recovery
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
deadLetterPublishingRecoverer,
new FixedBackOff(1000L, 3)
);
// Nicht wiederholbare Ausnahmen (direkter DLT-Versand)
errorHandler.addNotRetryableExceptions(
ValidationException.class,
JsonParseException.class,
NullPointerException.class
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}Die Unterscheidung zwischen wiederholbaren und nicht wiederholbaren Ausnahmen optimiert das Verhalten. Eine ValidationException weist auf fehlerhaft formatierte Daten hin, die durch erneute Versuche nicht behoben werden, was den direkten DLT-Versand rechtfertigt.
DLT-Consumer für manuelles Reprocessing
Ein dedizierter Consumer überwacht die DLT und erlaubt das Reprocessing von Nachrichten, sobald das zugrundeliegende Problem behoben ist.
@Service
@Slf4j
public class DeadLetterConsumer {
private final AlertingService alertingService;
private final FailedEventRepository failedEventRepository;
public DeadLetterConsumer(AlertingService alertingService,
FailedEventRepository failedEventRepository) {
this.alertingService = alertingService;
this.failedEventRepository = failedEventRepository;
}
@KafkaListener(
topics = "${app.kafka.topics.orders}.DLT",
groupId = "order-service-dlt"
)
public void handleDeadLetter(
ConsumerRecord<String, OrderEvent> record,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
@Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
@Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
Acknowledgment acknowledgment) {
log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
originalTopic, originalPartition, originalOffset, exceptionMessage);
// Persistenz für Analyse und späteres Reprocessing
FailedEvent failedEvent = FailedEvent.builder()
.eventId(record.value().eventId())
.eventType(record.value().eventType())
.payload(serializePayload(record.value()))
.originalTopic(originalTopic)
.originalPartition(originalPartition)
.originalOffset(originalOffset)
.exceptionMessage(exceptionMessage)
.stacktrace(stacktrace)
.status(FailedEventStatus.PENDING)
.createdAt(Instant.now())
.build();
failedEventRepository.save(failedEvent);
// Alarm zur menschlichen Intervention
alertingService.notifyDeadLetter(failedEvent);
acknowledgment.acknowledge();
}
}Spring Kafka reichert DLT-Nachrichten automatisch mit Headern an, die Fehler-Metadaten enthalten: Exception, Stacktrace, ursprüngliches Topic, Partition und Offset. Diese Informationen erleichtern die Diagnose.
Idempotenzbehandlung auf Consumer-Seite
Kafka garantiert eine "at least once"-Zustellung: Eine Nachricht kann mehrfach zugestellt werden, wenn ein Crash nach der Verarbeitung, aber vor dem Commit eintritt. Idempotenz auf Consumer-Seite verhindert Seiteneffekte beim erneuten Verarbeiten.
@Service
@Slf4j
public class IdempotentOrderProcessor {
private final ProcessedEventRepository processedEventRepository;
private final OrderService orderService;
public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
OrderService orderService) {
this.processedEventRepository = processedEventRepository;
this.orderService = orderService;
}
@Transactional
public void processIdempotent(OrderEvent event) {
String eventId = event.eventId();
// Prüfung: Ereignis bereits verarbeitet?
if (processedEventRepository.existsByEventId(eventId)) {
log.info("Event already processed, skipping: {}", eventId);
return;
}
// Fachliche Verarbeitung
switch (event.eventType()) {
case "ORDER_CREATED" -> orderService.createOrder(event.payload());
case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
default -> log.warn("Unknown event type: {}", event.eventType());
}
// Verarbeitung in derselben Transaktion festhalten
ProcessedEvent processed = ProcessedEvent.builder()
.eventId(eventId)
.eventType(event.eventType())
.processedAt(Instant.now())
.build();
processedEventRepository.save(processed);
log.info("Event processed and recorded: {}", eventId);
}
}@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {
// Index auf eventId für optimale Performance
boolean existsByEventId(String eventId);
// Bereinigung alter Einträge
@Modifying
@Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
int deleteOlderThan(@Param("before") Instant before);
}Die Erfassung der Verarbeitung in derselben Transaktion wie die fachliche Logik garantiert Atomizität. Ein Crash nach der Verarbeitung, aber vor dem Commit, spielt beide Operationen konsistent erneut ab.
Transactional-Outbox-Muster
Das Outbox-Muster löst Konsistenzprobleme zwischen Datenbank und Kafka. Anstatt direkt nach Kafka zu publizieren, werden Ereignisse zunächst in einer "Outbox"-Tabelle persistiert und anschließend von einem dedizierten Prozess weitergeleitet.
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String eventId;
@Column(nullable = false)
private String aggregateType;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String eventType;
@Column(columnDefinition = "TEXT", nullable = false)
private String payload;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private OutboxStatus status;
@Column(nullable = false)
private Instant createdAt;
private Instant publishedAt;
}
public enum OutboxStatus {
PENDING, PUBLISHED, FAILED
}@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
public OrderService(OrderRepository orderRepository,
OutboxRepository outboxRepository,
ObjectMapper objectMapper) {
this.orderRepository = orderRepository;
this.outboxRepository = outboxRepository;
this.objectMapper = objectMapper;
}
public Order createOrder(CreateOrderRequest request) {
// Bestellung anlegen
Order order = Order.builder()
.customerId(request.customerId())
.items(request.items())
.totalAmount(calculateTotal(request.items()))
.status(OrderStatus.CREATED)
.createdAt(Instant.now())
.build();
Order savedOrder = orderRepository.save(order);
// Outbox-Ereignis in derselben Transaktion erzeugen
OrderEvent event = OrderEvent.created(toPayload(savedOrder));
OutboxEvent outboxEvent = OutboxEvent.builder()
.eventId(event.eventId())
.aggregateType("Order")
.aggregateId(savedOrder.getId().toString())
.eventType(event.eventType())
.payload(serialize(event))
.status(OutboxStatus.PENDING)
.createdAt(Instant.now())
.build();
outboxRepository.save(outboxEvent);
return savedOrder;
}
private String serialize(OrderEvent event) {
try {
return objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
throw new SerializationException("Failed to serialize event", e);
}
}
}Der Outbox-Relayer publiziert die Ereignisse asynchron in Kafka:
@Component
@Slf4j
public class OutboxRelayScheduler {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OutboxRelayScheduler(OutboxRepository outboxRepository,
KafkaTemplate<String, String> kafkaTemplate) {
this.outboxRepository = outboxRepository;
this.kafkaTemplate = kafkaTemplate;
}
@Scheduled(fixedDelay = 1000)
@Transactional
public void relayPendingEvents() {
// Offene Ereignisse mit Lock abrufen
List<OutboxEvent> pendingEvents = outboxRepository
.findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);
for (OutboxEvent event : pendingEvents) {
try {
// In Kafka publizieren
kafkaTemplate.send(
ordersTopic,
event.getAggregateId(),
event.getPayload()
).get(5, TimeUnit.SECONDS);
// Status aktualisieren
event.setStatus(OutboxStatus.PUBLISHED);
event.setPublishedAt(Instant.now());
outboxRepository.save(event);
log.debug("Outbox event published: {}", event.getEventId());
} catch (Exception ex) {
log.error("Failed to publish outbox event: {}",
event.getEventId(), ex);
event.setStatus(OutboxStatus.FAILED);
outboxRepository.save(event);
}
}
}
}Dieses Muster stellt sicher, dass ein Ereignis nur publiziert wird, wenn die fachliche Transaktion erfolgreich war, und beseitigt Inkonsistenzen zwischen Datenbank und Kafka.
Fang an zu üben!
Teste dein Wissen mit unseren Interview-Simulatoren und technischen Tests.
Monitoring und Observability
Das Überwachen eines Kafka-Systems erfordert Metriken zu Konsumlatenz, Lag und Fehlern. Spring Boot Actuator stellt diese Metriken via Micrometer bereit.
@Configuration
public class KafkaMetricsConfig {
@Bean
public MeterBinder kafkaConsumerMetrics(
ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
return registry -> {
// Eigene Metrik: verarbeitete Ereignisse
Counter.builder("kafka.consumer.events.processed")
.description("Number of events successfully processed")
.tag("topic", "orders")
.register(registry);
// Eigene Metrik: fehlgeschlagene Ereignisse
Counter.builder("kafka.consumer.events.failed")
.description("Number of events that failed processing")
.tag("topic", "orders")
.register(registry);
};
}
}@Service
@Slf4j
public class InstrumentedOrderConsumer {
private final OrderProcessingService processingService;
private final MeterRegistry meterRegistry;
private final Counter processedCounter;
private final Counter failedCounter;
private final Timer processingTimer;
public InstrumentedOrderConsumer(OrderProcessingService processingService,
MeterRegistry meterRegistry) {
this.processingService = processingService;
this.meterRegistry = meterRegistry;
// Counter initialisieren
this.processedCounter = Counter.builder("kafka.consumer.events.processed")
.tag("topic", "orders")
.register(meterRegistry);
this.failedCounter = Counter.builder("kafka.consumer.events.failed")
.tag("topic", "orders")
.register(meterRegistry);
// Timer zur Messung der Verarbeitungslatenz
this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
.tag("topic", "orders")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
}
@KafkaListener(topics = "${app.kafka.topics.orders}")
public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
processingService.process(event);
acknowledgment.acknowledge();
processedCounter.increment();
sample.stop(processingTimer);
} catch (Exception ex) {
failedCounter.increment();
sample.stop(processingTimer);
throw ex;
}
}
}Diese Metriken erlauben das Konfigurieren von Alarmen zu Consumer-Lag, Fehlerrate und Verarbeitungslatenz und sind essenziell für eine proaktive Problemerkennung.
Fazit
Spring Kafka liefert eine robuste Integration für den Aufbau resilienter, ereignisgesteuerter Architekturen. Die Beherrschung von Retry-Mechanismen, Dead Letter Queues und Idempotenz bildet das Fundament produktionsreifer Anwendungen.
Checkliste für ereignisgesteuerte Architekturen mit Spring Kafka:
- ✅ Manuelle Offset-Commits mit
AckMode.MANUALkonfigurieren - ✅ Konsistenten Partitionsschlüssel verwenden, um die Ereignisreihenfolge zu wahren
- ✅ Wiederholungen mit exponentiellem Backoff über
DefaultErrorHandlerimplementieren - ✅ Fehlgeschlagene Nachrichten in eine Dead Letter Queue umleiten
- ✅ Idempotenz auf Consumer-Seite per eventId-Tracking sicherstellen
- ✅ Outbox-Muster für Konsistenz zwischen Datenbank und Kafka erwägen
- ✅ Metriken zur Überwachung via Micrometer bereitstellen
- ✅
MAX_POLL_INTERVAL_MSan die maximale Verarbeitungszeit anpassen
Tags
Teilen
Verwandte Artikel

Spring Cloud Gateway im Interview: Routing, Filter und Load Balancing
Spring Cloud Gateway für technische Interviews meistern: 12 Fragen zu Routing, Filtern, Load Balancing und API-Gateway-Patterns mit Codebeispielen.

Spring Boot Logging im Jahr 2026: strukturierte Logs in Produktion mit Logback und JSON
Vollständiger Leitfaden zu strukturiertem Logging in Spring Boot. Logback-JSON-Konfiguration, MDC für Tracing, Best Practices in Produktion und ELK-Stack-Integration.

Spring GraphQL Interview: Resolver, DataLoader und Lösungen für das N+1-Problem
Vorbereitung auf Spring GraphQL Interviews mit diesem vollständigen Leitfaden. Resolver, DataLoader, Umgang mit dem N+1-Problem, Mutationen und Best Practices für technische Fragen.