Spring Kafka: architektura event-driven z odpornymi konsumentami
Kompletny przewodnik po Spring Kafka dla architektur event-driven. Konfiguracja, odporni konsumenci, polityki retry, dead letter queue i wzorce produkcyjne dla aplikacji rozproszonych.

Apache Kafka stał się de facto standardem dla architektur sterowanych zdarzeniami na dużą skalę. Spring Kafka upraszcza jego integrację z aplikacjami Spring Boot, zapewniając jednocześnie kluczowe mechanizmy odporności w środowiskach produkcyjnych. Niniejszy przewodnik dogłębnie omawia konfigurację, wzorce konsumpcji i strategie obsługi błędów.
Przewodnik zakłada znajomość podstawowych pojęć Kafki: topiki, partycje, consumer groups oraz offsety. Nacisk położono na integrację ze Springiem i wzorce odporności.
Dlaczego wybrać architekturę event-driven?
Architektury event-driven rozdzielają komponenty systemu poprzez asynchroniczne zdarzenia. W przeciwieństwie do synchronicznych wywołań REST, producenci emitują zdarzenia bez oczekiwania na odpowiedź, pozwalając konsumentom przetwarzać je we własnym tempie.
Takie podejście przynosi kilka kluczowych korzyści: niezależną skalowalność horyzontalną poszczególnych usług, większą odporność na chwilowe awarie oraz pełną śledzialność dzięki niemodyfikowalnemu logowi Kafki.
public record OrderEvent(
// Unikalny identyfikator zdarzenia dla idempotentności
String eventId,
// Typ zdarzenia do routingu
String eventType,
// Znacznik czasu utworzenia
Instant createdAt,
// Payload biznesowy
OrderPayload payload
) {
// Metoda fabrykująca gwarantująca unikalność eventId
public static OrderEvent created(OrderPayload payload) {
return new OrderEvent(
UUID.randomUUID().toString(),
"ORDER_CREATED",
Instant.now(),
payload
);
}
}
public record OrderPayload(
Long orderId,
Long customerId,
List<OrderItem> items,
BigDecimal totalAmount
) {}Struktura zdarzenia systematycznie zawiera unikalny identyfikator, typ oraz znacznik czasu. Te metadane umożliwiają filtrowanie po stronie konsumenta, wykrywanie duplikatów i śledzenie czasowe.
Podstawowa konfiguracja Spring Kafka
Integracja zaczyna się od startera spring-kafka i minimalnej konfiguracji YAML. Spring Boot automatycznie konfiguruje kluczowe beany: KafkaTemplate do produkcji oraz ConcurrentKafkaListenerContainerFactory do konsumpcji.
# application.yml
spring:
kafka:
# Adresy brokerów Kafka (klaster)
bootstrap-servers: localhost:9092
# Konfiguracja producenta
producer:
# Serializacja kluczy String
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Serializacja wartości JSON
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# Oczekiwanie na potwierdzenie wszystkich replik
acks: all
# Liczba ponownych prób przy błędzie sieci
retries: 3
# Konfiguracja konsumenta
consumer:
# Identyfikator consumer group
group-id: order-service
# Deserializacja kluczy
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Deserializacja JSON z typem docelowym
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# Pozycja startowa, gdy brak zarejestrowanego offsetu
auto-offset-reset: earliest
# Wyłączenie auto-commit dla ręcznej kontroli
enable-auto-commit: false
properties:
# Zaufany pakiet do deserializacji
spring.json.trusted.packages: com.example.eventsWyłączenie enable-auto-commit to kluczowa praktyka produkcyjna. Ręczne commity offsetów gwarantują, że wiadomość zostaje oznaczona jako przetworzona dopiero po faktycznym zakończeniu przetwarzania.
Tworzenie producenta Kafki
KafkaTemplate enkapsuluje logikę wysyłania do Kafki. Bezpośrednie wstrzyknięcie umożliwia natychmiastowe użycie wewnątrz serwisów biznesowych.
@Service
@Slf4j
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
// Topik docelowy (wyłuskany z konfiguracji)
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
OrderPayload payload) {
// Tworzenie zdarzenia z metadanymi
OrderEvent event = OrderEvent.created(payload);
// Użycie orderId jako klucza partycji
// Gwarantuje kolejność zdarzeń dla tego samego zamówienia
String partitionKey = payload.orderId().toString();
log.info("Publishing ORDER_CREATED event: {} to topic: {}",
event.eventId(), ordersTopic);
// Asynchroniczna wysyłka z callbackiem
return kafkaTemplate.send(ordersTopic, partitionKey, event)
.whenComplete((result, ex) -> {
if (ex == null) {
// Sukces: log metadanych wysyłki
RecordMetadata metadata = result.getRecordMetadata();
log.info("Event sent successfully: topic={}, partition={}, offset={}",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
// Błąd: log błędu do dalszej analizy
log.error("Failed to send event: {}", event.eventId(), ex);
}
});
}
}Użycie klucza partycji opartego na identyfikatorze biznesowym zapewnia, że wszystkie zdarzenia tej samej encji trafią do tej samej partycji, dzięki czemu zachowana zostaje ich kolejność chronologiczna.
Pusty klucz rozprowadza wiadomości metodą round-robin między partycjami. Maksymalizuje to równoległość, ale traci gwarancje porządku. Wybór klucza zależy od wymagań biznesowych.
Podstawowy konsument z @KafkaListener
Adnotacja @KafkaListener przekształca metodę w konsumenta Kafki. Spring automatycznie obsługuje pętlę pollingu, deserializację oraz commity offsetów.
@Service
@Slf4j
public class OrderEventConsumer {
private final OrderProcessingService processingService;
public OrderEventConsumer(OrderProcessingService processingService) {
this.processingService = processingService;
}
@KafkaListener(
// Topik(i) do nasłuchu
topics = "${app.kafka.topics.orders}",
// Consumer group
groupId = "${spring.kafka.consumer.group-id}",
// Niestandardowa fabryka dla zaawansowanej konfiguracji
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
// Payload zdeserializowany automatycznie
OrderEvent event,
// Wstrzyknięte metadane Kafki
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
// Acknowledgment do ręcznego commita
Acknowledgment acknowledgment) {
log.info("Received event: type={}, partition={}, offset={}",
event.eventType(), partition, offset);
try {
// Przetwarzanie biznesowe
processingService.process(event);
// Commit offsetu wyłącznie po pomyślnym przetworzeniu
acknowledgment.acknowledge();
log.info("Event processed successfully: {}", event.eventId());
} catch (Exception ex) {
// Brak acknowledge() powoduje ponowne przetworzenie
log.error("Failed to process event: {}", event.eventId(), ex);
throw ex;
}
}
}Wstrzyknięcie Acknowledgment pozwala na jawną kontrolę commita. Bez wywołania acknowledge() offset pozostaje niezatwierdzony, a wiadomość zostanie ponownie dostarczona przy kolejnym pollingu.
Zaawansowana konfiguracja ConsumerFactory
Domyślna konfiguracja sprawdza się w środowisku deweloperskim, ale produkcja wymaga dostosowań. Niestandardowa fabryka oferuje precyzyjną kontrolę nad zachowaniem konsumenta.
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, OrderEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
// Konfiguracja połączenia
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// Deserializacja
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
// Zarządzanie offsetami
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Wydajność: rekordy na pojedynczy poll
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// Timeout sesji do wykrywania awarii
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// Odstęp heartbeat (1/3 timeoutu sesji)
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
// Maksymalny czas przetwarzania przed rebalansem
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// Konfiguracja deserializatora JSON
JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
deserializer.addTrustedPackages("com.example.events");
deserializer.setUseTypeMapperForKey(false);
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
deserializer
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Ręczny tryb acknowledgmentu
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// Liczba wątków konsumenta
factory.setConcurrency(3);
// Przetwarzanie wsadowe wyłączone (jedna wiadomość naraz)
factory.setBatchListener(false);
return factory;
}
}Parametr MAX_POLL_INTERVAL_MS_CONFIG definiuje maksymalne opóźnienie między wywołaniami poll(). Jego przekroczenie powoduje wykluczenie konsumenta z grupy i rebalans. Wartość powinna odzwierciedlać maksymalny oczekiwany czas przetwarzania.
Gotowy na rozmowy o Spring Boot?
Ćwicz z naszymi interaktywnymi symulatorami, flashcards i testami technicznymi.
Strategie ponawiania z RetryTemplate
Błędy przejściowe (chwilowa niedostępność usługi, timeouty sieciowe) wymagają automatycznych ponownych prób. Spring Kafka integruje się z RetryTemplate w celu wdrożenia zaawansowanych polityk retry.
@Configuration
public class KafkaRetryConfig {
@Bean
public RetryTemplate kafkaRetryTemplate() {
RetryTemplate template = new RetryTemplate();
// Polityka retry: maks. 3 próby
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
template.setRetryPolicy(retryPolicy);
// Wykładniczy backoff: 1s, 2s, 4s między próbami
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
template.setBackOffPolicy(backOffPolicy);
return template;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
retryableKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
RetryTemplate kafkaRetryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// Konfiguracja retry z callbackiem recovery
factory.setCommonErrorHandler(
new DefaultErrorHandler(
// Recovery: akcja po wyczerpaniu prób
(record, exception) -> {
log.error("All retries exhausted for record: key={}, value={}",
record.key(), record.value(), exception);
},
// Wykładniczy backoff: 1s start, maks. 30s, 3 próby
new ExponentialBackOff(1000L, 2.0)
)
);
return factory;
}
}DefaultErrorHandler zastępuje starszy SeekToCurrentErrorHandler od Spring Kafka 2.8. Oferuje czytelniejsze API i rozszerzone opcje konfiguracyjne.
Implementacja Dead Letter Queue
Po wyczerpaniu prób nieudane wiadomości należy kierować do Dead Letter Queue (DLQ) w celu późniejszej analizy. Takie podejście zapobiega utracie danych, jednocześnie odblokowując konsumenta.
@Configuration
public class DeadLetterConfig {
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
KafkaTemplate<String, Object> kafkaTemplate) {
// Strategia nazewnictwa topiku DLT: original-topic.DLT
return new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, exception) -> {
// Topik DLT bazujący na topiku źródłowym
String dltTopic = record.topic() + ".DLT";
log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
dltTopic, record.key(), exception.getMessage());
return new TopicPartition(dltTopic, record.partition());
}
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
dltKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// ErrorHandler z recovery DLT
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
deadLetterPublishingRecoverer,
new FixedBackOff(1000L, 3)
);
// Wyjątki nieretryowalne (bezpośrednia wysyłka do DLT)
errorHandler.addNotRetryableExceptions(
ValidationException.class,
JsonParseException.class,
NullPointerException.class
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}Rozróżnianie wyjątków retryowalnych i nieretryowalnych optymalizuje zachowanie. ValidationException wskazuje na nieprawidłowe dane, których ponowne próby nie naprawią, co uzasadnia bezpośrednie kierowanie do DLT.
Konsument DLT do ręcznego ponownego przetwarzania
Dedykowany konsument monitoruje DLT i umożliwia ponowne przetworzenie wiadomości po naprawieniu pierwotnego problemu.
@Service
@Slf4j
public class DeadLetterConsumer {
private final AlertingService alertingService;
private final FailedEventRepository failedEventRepository;
public DeadLetterConsumer(AlertingService alertingService,
FailedEventRepository failedEventRepository) {
this.alertingService = alertingService;
this.failedEventRepository = failedEventRepository;
}
@KafkaListener(
topics = "${app.kafka.topics.orders}.DLT",
groupId = "order-service-dlt"
)
public void handleDeadLetter(
ConsumerRecord<String, OrderEvent> record,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
@Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
@Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
Acknowledgment acknowledgment) {
log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
originalTopic, originalPartition, originalOffset, exceptionMessage);
// Persystencja do analizy i późniejszego reprocessingu
FailedEvent failedEvent = FailedEvent.builder()
.eventId(record.value().eventId())
.eventType(record.value().eventType())
.payload(serializePayload(record.value()))
.originalTopic(originalTopic)
.originalPartition(originalPartition)
.originalOffset(originalOffset)
.exceptionMessage(exceptionMessage)
.stacktrace(stacktrace)
.status(FailedEventStatus.PENDING)
.createdAt(Instant.now())
.build();
failedEventRepository.save(failedEvent);
// Alert do interwencji człowieka
alertingService.notifyDeadLetter(failedEvent);
acknowledgment.acknowledge();
}
}Spring Kafka automatycznie wzbogaca wiadomości DLT o nagłówki zawierające metadane awarii: wyjątek, stacktrace, oryginalny topik, partycję i offset. Te informacje ułatwiają diagnostykę.
Obsługa idempotentności po stronie konsumenta
Kafka gwarantuje dostawę "at least once": wiadomość może zostać dostarczona wielokrotnie, jeśli wystąpi awaria po przetworzeniu, ale przed commitem. Idempotentność po stronie konsumenta zapobiega skutkom ubocznym ponownego przetwarzania.
@Service
@Slf4j
public class IdempotentOrderProcessor {
private final ProcessedEventRepository processedEventRepository;
private final OrderService orderService;
public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
OrderService orderService) {
this.processedEventRepository = processedEventRepository;
this.orderService = orderService;
}
@Transactional
public void processIdempotent(OrderEvent event) {
String eventId = event.eventId();
// Weryfikacja: zdarzenie już przetworzone?
if (processedEventRepository.existsByEventId(eventId)) {
log.info("Event already processed, skipping: {}", eventId);
return;
}
// Przetwarzanie biznesowe
switch (event.eventType()) {
case "ORDER_CREATED" -> orderService.createOrder(event.payload());
case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
default -> log.warn("Unknown event type: {}", event.eventType());
}
// Zapis przetwarzania w tej samej transakcji
ProcessedEvent processed = ProcessedEvent.builder()
.eventId(eventId)
.eventType(event.eventType())
.processedAt(Instant.now())
.build();
processedEventRepository.save(processed);
log.info("Event processed and recorded: {}", eventId);
}
}@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {
// Indeks na eventId dla optymalnej wydajności
boolean existsByEventId(String eventId);
// Czyszczenie starych rekordów
@Modifying
@Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
int deleteOlderThan(@Param("before") Instant before);
}Zapis przetwarzania w tej samej transakcji co logika biznesowa gwarantuje atomowość. Awaria po przetworzeniu, ale przed commitem, spójnie odtworzy obie operacje.
Wzorzec Transactional Outbox
Wzorzec Outbox rozwiązuje problemy spójności między bazą danych a Kafką. Zamiast publikować bezpośrednio do Kafki, zdarzenia są najpierw zapisywane w tabeli "outbox", a następnie przekazywane przez dedykowany proces.
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String eventId;
@Column(nullable = false)
private String aggregateType;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String eventType;
@Column(columnDefinition = "TEXT", nullable = false)
private String payload;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private OutboxStatus status;
@Column(nullable = false)
private Instant createdAt;
private Instant publishedAt;
}
public enum OutboxStatus {
PENDING, PUBLISHED, FAILED
}@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
public OrderService(OrderRepository orderRepository,
OutboxRepository outboxRepository,
ObjectMapper objectMapper) {
this.orderRepository = orderRepository;
this.outboxRepository = outboxRepository;
this.objectMapper = objectMapper;
}
public Order createOrder(CreateOrderRequest request) {
// Utworzenie zamówienia
Order order = Order.builder()
.customerId(request.customerId())
.items(request.items())
.totalAmount(calculateTotal(request.items()))
.status(OrderStatus.CREATED)
.createdAt(Instant.now())
.build();
Order savedOrder = orderRepository.save(order);
// Utworzenie zdarzenia outbox w tej samej transakcji
OrderEvent event = OrderEvent.created(toPayload(savedOrder));
OutboxEvent outboxEvent = OutboxEvent.builder()
.eventId(event.eventId())
.aggregateType("Order")
.aggregateId(savedOrder.getId().toString())
.eventType(event.eventType())
.payload(serialize(event))
.status(OutboxStatus.PENDING)
.createdAt(Instant.now())
.build();
outboxRepository.save(outboxEvent);
return savedOrder;
}
private String serialize(OrderEvent event) {
try {
return objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
throw new SerializationException("Failed to serialize event", e);
}
}
}Outbox relayer publikuje zdarzenia do Kafki asynchronicznie:
@Component
@Slf4j
public class OutboxRelayScheduler {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OutboxRelayScheduler(OutboxRepository outboxRepository,
KafkaTemplate<String, String> kafkaTemplate) {
this.outboxRepository = outboxRepository;
this.kafkaTemplate = kafkaTemplate;
}
@Scheduled(fixedDelay = 1000)
@Transactional
public void relayPendingEvents() {
// Pobranie oczekujących zdarzeń z lockiem
List<OutboxEvent> pendingEvents = outboxRepository
.findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);
for (OutboxEvent event : pendingEvents) {
try {
// Publikacja do Kafki
kafkaTemplate.send(
ordersTopic,
event.getAggregateId(),
event.getPayload()
).get(5, TimeUnit.SECONDS);
// Aktualizacja statusu
event.setStatus(OutboxStatus.PUBLISHED);
event.setPublishedAt(Instant.now());
outboxRepository.save(event);
log.debug("Outbox event published: {}", event.getEventId());
} catch (Exception ex) {
log.error("Failed to publish outbox event: {}",
event.getEventId(), ex);
event.setStatus(OutboxStatus.FAILED);
outboxRepository.save(event);
}
}
}
}Ten wzorzec gwarantuje, że zdarzenie zostanie opublikowane wyłącznie wtedy, gdy transakcja biznesowa zakończyła się powodzeniem, eliminując niespójności między bazą danych a Kafką.
Zacznij ćwiczyć!
Sprawdź swoją wiedzę z naszymi symulatorami rozmów i testami technicznymi.
Monitoring i obserwowalność
Nadzór nad systemem Kafka wymaga metryk dotyczących opóźnienia konsumpcji, lagu i błędów. Spring Boot Actuator udostępnia te metryki przez Micrometer.
@Configuration
public class KafkaMetricsConfig {
@Bean
public MeterBinder kafkaConsumerMetrics(
ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
return registry -> {
// Niestandardowa metryka: przetworzone zdarzenia
Counter.builder("kafka.consumer.events.processed")
.description("Number of events successfully processed")
.tag("topic", "orders")
.register(registry);
// Niestandardowa metryka: zdarzenia zakończone niepowodzeniem
Counter.builder("kafka.consumer.events.failed")
.description("Number of events that failed processing")
.tag("topic", "orders")
.register(registry);
};
}
}@Service
@Slf4j
public class InstrumentedOrderConsumer {
private final OrderProcessingService processingService;
private final MeterRegistry meterRegistry;
private final Counter processedCounter;
private final Counter failedCounter;
private final Timer processingTimer;
public InstrumentedOrderConsumer(OrderProcessingService processingService,
MeterRegistry meterRegistry) {
this.processingService = processingService;
this.meterRegistry = meterRegistry;
// Inicjalizacja liczników
this.processedCounter = Counter.builder("kafka.consumer.events.processed")
.tag("topic", "orders")
.register(meterRegistry);
this.failedCounter = Counter.builder("kafka.consumer.events.failed")
.tag("topic", "orders")
.register(meterRegistry);
// Timer mierzący opóźnienie przetwarzania
this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
.tag("topic", "orders")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
}
@KafkaListener(topics = "${app.kafka.topics.orders}")
public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
processingService.process(event);
acknowledgment.acknowledge();
processedCounter.increment();
sample.stop(processingTimer);
} catch (Exception ex) {
failedCounter.increment();
sample.stop(processingTimer);
throw ex;
}
}
}Te metryki pozwalają konfigurować alerty dotyczące lagu konsumenta, wskaźnika błędów i opóźnienia przetwarzania, kluczowe dla proaktywnego wykrywania problemów.
Podsumowanie
Spring Kafka zapewnia solidną integrację do budowy odpornych architektur event-driven. Opanowanie mechanizmów retry, dead letter queue oraz idempotentności stanowi fundament aplikacji gotowych do produkcji.
Lista kontrolna architektury event-driven ze Spring Kafka:
- ✅ Skonfigurować ręczne commity offsetów z
AckMode.MANUAL - ✅ Stosować spójny klucz partycji, aby zachować kolejność zdarzeń
- ✅ Wdrożyć retry z wykładniczym backoffem za pomocą
DefaultErrorHandler - ✅ Kierować nieudane wiadomości do Dead Letter Queue
- ✅ Zapewnić idempotentność po stronie konsumenta przez śledzenie eventId
- ✅ Rozważyć wzorzec Outbox dla spójności bazy danych i Kafki
- ✅ Udostępnić metryki przez Micrometer do monitoringu
- ✅ Dostosować
MAX_POLL_INTERVAL_MSdo maksymalnego czasu przetwarzania
Tagi
Udostępnij
Powiązane artykuły

Rozmowa kwalifikacyjna Spring Cloud Gateway: Routing, Filtry i Load Balancing
Opanuj Spring Cloud Gateway na rozmowy techniczne: 12 pytań o routing, filtry, load balancing i wzorce API Gateway z przykładami kodu.

Logowanie w Spring Boot 2026: logi strukturalne na produkcji z Logback i JSON
Kompletny przewodnik po logowaniu strukturalnym w Spring Boot. Konfiguracja Logback JSON, MDC do tracingu, najlepsze praktyki produkcyjne i integracja z ELK Stack.

Rozmowa kwalifikacyjna Spring GraphQL: Resolvery, DataLoadery i Rozwiązania problemu N+1
Przygotowanie do rozmów kwalifikacyjnych Spring GraphQL z tym kompletnym przewodnikiem. Resolvery, DataLoadery, obsługa problemu N+1, mutacje i najlepsze praktyki dla pytań technicznych.