Spring Kafka: 회복탄력성을 갖춘 컨슈머로 구축하는 이벤트 기반 아키텍처
이벤트 기반 아키텍처를 위한 완전한 Spring Kafka 가이드. 설정, 회복탄력성을 갖춘 컨슈머, 재시도 정책, Dead Letter Queue, 분산 애플리케이션을 위한 운영 패턴.

Apache Kafka는 대규모 이벤트 기반 아키텍처의 사실상 표준이 되었습니다. Spring Kafka는 Spring Boot 애플리케이션과의 통합을 단순화하면서도 운영 환경에 필수적인 회복탄력성 메커니즘을 제공합니다. 본 가이드는 설정, 컨슈밍 패턴, 오류 처리 전략을 깊이 있게 살펴봅니다.
이 가이드는 토픽, 파티션, 컨슈머 그룹, 오프셋 등 Kafka의 기본 개념에 친숙하다는 점을 전제로 합니다. 초점은 Spring 통합과 회복탄력성 패턴에 맞춰져 있습니다.
왜 이벤트 기반 아키텍처를 선택해야 할까요?
이벤트 기반 아키텍처는 비동기 이벤트를 통해 시스템 구성 요소를 분리합니다. 동기적인 REST 호출과 달리, 프로듀서는 응답을 기다리지 않고 이벤트를 발행하며, 컨슈머는 자신의 속도에 맞춰 처리할 수 있습니다.
이 접근 방식은 서비스별 독립적인 수평 확장성, 일시적 장애에 대한 향상된 회복탄력성, 그리고 Kafka의 불변 로그를 통한 완벽한 추적성과 같은 핵심 이점을 제공합니다.
public record OrderEvent(
// 멱등성을 위한 이벤트의 고유 식별자
String eventId,
// 라우팅을 위한 이벤트 유형
String eventType,
// 생성 타임스탬프
Instant createdAt,
// 비즈니스 페이로드
OrderPayload payload
) {
// eventId 고유성을 보장하는 팩토리 메서드
public static OrderEvent created(OrderPayload payload) {
return new OrderEvent(
UUID.randomUUID().toString(),
"ORDER_CREATED",
Instant.now(),
payload
);
}
}
public record OrderPayload(
Long orderId,
Long customerId,
List<OrderItem> items,
BigDecimal totalAmount
) {}이벤트 구조에는 고유 식별자, 유형, 타임스탬프가 항상 포함됩니다. 이러한 메타데이터는 컨슈머 측 필터링, 중복 감지, 시간적 추적을 가능하게 합니다.
Spring Kafka 기본 설정
통합은 spring-kafka 스타터와 최소한의 YAML 설정으로 시작합니다. Spring Boot는 발행을 위한 KafkaTemplate과 컨슈밍을 위한 ConcurrentKafkaListenerContainerFactory 같은 핵심 빈을 자동으로 구성합니다.
# application.yml
spring:
kafka:
# Kafka 브로커 주소(클러스터)
bootstrap-servers: localhost:9092
# 프로듀서 설정
producer:
# 문자열 키 직렬화
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# JSON 값 직렬화
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 모든 레플리카로부터 ACK 대기
acks: all
# 네트워크 오류 시 재시도 횟수
retries: 3
# 컨슈머 설정
consumer:
# 컨슈머 그룹 식별자
group-id: order-service
# 키 역직렬화
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 대상 타입 지정 JSON 역직렬화
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 기록된 오프셋이 없을 때 시작 위치
auto-offset-reset: earliest
# 수동 제어를 위해 자동 커밋 비활성화
enable-auto-commit: false
properties:
# 역직렬화에 신뢰되는 패키지
spring.json.trusted.packages: com.example.eventsenable-auto-commit 비활성화는 운영 환경의 필수 관행입니다. 수동 오프셋 커밋은 메시지가 실제 처리 완료 후에만 처리됨으로 표시되도록 보장합니다.
Kafka 프로듀서 만들기
KafkaTemplate은 Kafka로의 전송 로직을 캡슐화합니다. 직접 주입을 통해 비즈니스 서비스 내에서 즉시 사용할 수 있습니다.
@Service
@Slf4j
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
// 대상 토픽(설정으로 외부화)
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
OrderPayload payload) {
// 메타데이터를 포함한 이벤트 생성
OrderEvent event = OrderEvent.created(payload);
// 파티션 키로 orderId 사용
// 동일 주문의 이벤트 순서를 보장
String partitionKey = payload.orderId().toString();
log.info("Publishing ORDER_CREATED event: {} to topic: {}",
event.eventId(), ordersTopic);
// 콜백을 사용하는 비동기 전송
return kafkaTemplate.send(ordersTopic, partitionKey, event)
.whenComplete((result, ex) -> {
if (ex == null) {
// 성공: 전송 메타데이터 로깅
RecordMetadata metadata = result.getRecordMetadata();
log.info("Event sent successfully: topic={}, partition={}, offset={}",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
// 실패: 조사용 오류 로깅
log.error("Failed to send event: {}", event.eventId(), ex);
}
});
}
}비즈니스 식별자 기반 파티션 키 사용은 동일 엔티티의 모든 이벤트가 같은 파티션에 떨어지도록 보장하여 시간 순서를 유지합니다.
null 키는 메시지를 파티션 사이에 라운드 로빈으로 분산합니다. 병렬성은 극대화되지만 순서 보장은 사라집니다. 키 선택은 비즈니스 요구사항에 달려 있습니다.
@KafkaListener를 사용한 기본 컨슈머
@KafkaListener 어노테이션은 메서드를 Kafka 컨슈머로 변환합니다. Spring은 폴링 루프, 역직렬화, 오프셋 커밋을 자동으로 처리합니다.
@Service
@Slf4j
public class OrderEventConsumer {
private final OrderProcessingService processingService;
public OrderEventConsumer(OrderProcessingService processingService) {
this.processingService = processingService;
}
@KafkaListener(
// 청취할 토픽
topics = "${app.kafka.topics.orders}",
// 컨슈머 그룹
groupId = "${spring.kafka.consumer.group-id}",
// 고급 설정용 커스텀 팩토리
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
// 자동으로 역직렬화된 페이로드
OrderEvent event,
// 주입된 Kafka 메타데이터
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
// 수동 커밋용 Acknowledgment
Acknowledgment acknowledgment) {
log.info("Received event: type={}, partition={}, offset={}",
event.eventType(), partition, offset);
try {
// 비즈니스 처리
processingService.process(event);
// 처리 성공 후에만 오프셋 커밋
acknowledgment.acknowledge();
log.info("Event processed successfully: {}", event.eventId());
} catch (Exception ex) {
// acknowledge() 미호출 시 재처리 발생
log.error("Failed to process event: {}", event.eventId(), ex);
throw ex;
}
}
}Acknowledgment 주입은 커밋을 명시적으로 제어할 수 있게 합니다. acknowledge() 호출이 없으면 오프셋은 커밋되지 않은 상태로 남아 다음 폴링에서 메시지가 재전송됩니다.
ConsumerFactory 고급 설정
기본 설정은 개발에 적합하지만 운영에서는 조정이 필요합니다. 커스텀 팩토리는 컨슈머 동작을 세밀하게 제어할 수 있게 합니다.
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, OrderEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
// 연결 설정
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// 역직렬화
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
// 오프셋 관리
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 성능: 폴링당 레코드 수
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// 장애 감지를 위한 세션 타임아웃
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// 하트비트 간격(세션 타임아웃의 1/3)
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
// 리밸런스 전 최대 처리 시간
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// JSON 역직렬화 설정
JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
deserializer.addTrustedPackages("com.example.events");
deserializer.setUseTypeMapperForKey(false);
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
deserializer
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 수동 acknowledgment 모드
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// 컨슈머 스레드 수
factory.setConcurrency(3);
// 배치 처리 비활성화(한 번에 한 메시지)
factory.setBatchListener(false);
return factory;
}
}MAX_POLL_INTERVAL_MS_CONFIG 매개변수는 poll() 호출 사이의 최대 지연을 정의합니다. 초과되면 컨슈머가 그룹에서 제외되고 리밸런스가 발생합니다. 이 값은 예상되는 최대 처리 시간을 반영해야 합니다.
Spring Boot 면접 준비가 되셨나요?
인터랙티브 시뮬레이터, flashcards, 기술 테스트로 연습하세요.
RetryTemplate을 활용한 재시도 전략
일시적 오류(서비스의 단기 가용 불가, 네트워크 타임아웃)는 자동 재시도를 필요로 합니다. Spring Kafka는 RetryTemplate과 통합되어 정교한 재시도 정책을 구현할 수 있게 합니다.
@Configuration
public class KafkaRetryConfig {
@Bean
public RetryTemplate kafkaRetryTemplate() {
RetryTemplate template = new RetryTemplate();
// 재시도 정책: 최대 3회
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
template.setRetryPolicy(retryPolicy);
// 지수 백오프: 1초, 2초, 4초 간격
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
template.setBackOffPolicy(backOffPolicy);
return template;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
retryableKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
RetryTemplate kafkaRetryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// 복구 콜백을 포함한 재시도 설정
factory.setCommonErrorHandler(
new DefaultErrorHandler(
// 복구: 재시도 소진 후 동작
(record, exception) -> {
log.error("All retries exhausted for record: key={}, value={}",
record.key(), record.value(), exception);
},
// 지수 백오프: 1초 시작, 최대 30초, 3회 시도
new ExponentialBackOff(1000L, 2.0)
)
);
return factory;
}
}DefaultErrorHandler는 Spring Kafka 2.8부터 기존 SeekToCurrentErrorHandler를 대체합니다. 더 명확한 API와 확장된 설정 옵션을 제공합니다.
Dead Letter Queue 구현
재시도가 소진되면 실패한 메시지는 추후 분석을 위해 Dead Letter Queue(DLQ)로 라우팅되어야 합니다. 이 접근은 데이터 손실을 방지하면서 컨슈머의 정체를 해소합니다.
@Configuration
public class DeadLetterConfig {
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
KafkaTemplate<String, Object> kafkaTemplate) {
// DLT 토픽 명명 규칙: original-topic.DLT
return new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, exception) -> {
// 원본 토픽 기반의 DLT 토픽
String dltTopic = record.topic() + ".DLT";
log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
dltTopic, record.key(), exception.getMessage());
return new TopicPartition(dltTopic, record.partition());
}
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
dltKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
// DLT 복구가 포함된 ErrorHandler
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
deadLetterPublishingRecoverer,
new FixedBackOff(1000L, 3)
);
// 재시도 불가 예외(DLT로 직접 전송)
errorHandler.addNotRetryableExceptions(
ValidationException.class,
JsonParseException.class,
NullPointerException.class
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}재시도 가능한 예외와 그렇지 않은 예외를 구분하면 동작이 최적화됩니다. ValidationException은 재시도로 해결되지 않는 잘못된 형식의 데이터를 의미하므로 DLT 직접 라우팅을 정당화합니다.
수동 재처리를 위한 DLT 컨슈머
전용 컨슈머가 DLT를 모니터링하고 근본 문제 해결 후 메시지를 재처리할 수 있게 합니다.
@Service
@Slf4j
public class DeadLetterConsumer {
private final AlertingService alertingService;
private final FailedEventRepository failedEventRepository;
public DeadLetterConsumer(AlertingService alertingService,
FailedEventRepository failedEventRepository) {
this.alertingService = alertingService;
this.failedEventRepository = failedEventRepository;
}
@KafkaListener(
topics = "${app.kafka.topics.orders}.DLT",
groupId = "order-service-dlt"
)
public void handleDeadLetter(
ConsumerRecord<String, OrderEvent> record,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
@Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
@Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
Acknowledgment acknowledgment) {
log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
originalTopic, originalPartition, originalOffset, exceptionMessage);
// 분석과 추후 재처리를 위한 영속화
FailedEvent failedEvent = FailedEvent.builder()
.eventId(record.value().eventId())
.eventType(record.value().eventType())
.payload(serializePayload(record.value()))
.originalTopic(originalTopic)
.originalPartition(originalPartition)
.originalOffset(originalOffset)
.exceptionMessage(exceptionMessage)
.stacktrace(stacktrace)
.status(FailedEventStatus.PENDING)
.createdAt(Instant.now())
.build();
failedEventRepository.save(failedEvent);
// 사람의 개입을 위한 알림
alertingService.notifyDeadLetter(failedEvent);
acknowledgment.acknowledge();
}
}Spring Kafka는 DLT 메시지에 장애 메타데이터(예외, 스택트레이스, 원본 토픽, 파티션, 오프셋)를 담은 헤더를 자동으로 추가합니다. 이러한 정보는 진단을 용이하게 합니다.
컨슈머 측 멱등성 처리
Kafka는 "at least once" 전달을 보장합니다. 처리 후 커밋 전에 크래시가 발생하면 메시지가 여러 번 전달될 수 있습니다. 컨슈머 측 멱등성은 재처리에 따른 부작용을 방지합니다.
@Service
@Slf4j
public class IdempotentOrderProcessor {
private final ProcessedEventRepository processedEventRepository;
private final OrderService orderService;
public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
OrderService orderService) {
this.processedEventRepository = processedEventRepository;
this.orderService = orderService;
}
@Transactional
public void processIdempotent(OrderEvent event) {
String eventId = event.eventId();
// 검증: 이미 처리된 이벤트인가?
if (processedEventRepository.existsByEventId(eventId)) {
log.info("Event already processed, skipping: {}", eventId);
return;
}
// 비즈니스 처리
switch (event.eventType()) {
case "ORDER_CREATED" -> orderService.createOrder(event.payload());
case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
default -> log.warn("Unknown event type: {}", event.eventType());
}
// 동일 트랜잭션에서 처리 사실 기록
ProcessedEvent processed = ProcessedEvent.builder()
.eventId(eventId)
.eventType(event.eventType())
.processedAt(Instant.now())
.build();
processedEventRepository.save(processed);
log.info("Event processed and recorded: {}", eventId);
}
}@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {
// 최적 성능을 위한 eventId 인덱스
boolean existsByEventId(String eventId);
// 오래된 레코드 정리
@Modifying
@Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
int deleteOlderThan(@Param("before") Instant before);
}비즈니스 로직과 동일 트랜잭션에서 처리 사실을 기록하면 원자성이 보장됩니다. 처리 후 커밋 전 크래시가 발생해도 두 작업이 일관성 있게 다시 재생됩니다.
트랜잭션 Outbox 패턴
Outbox 패턴은 데이터베이스와 Kafka 사이의 일관성 문제를 해결합니다. Kafka로 직접 게시하는 대신, 이벤트는 먼저 "outbox" 테이블에 영속화된 뒤 전용 프로세스에 의해 중계됩니다.
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String eventId;
@Column(nullable = false)
private String aggregateType;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String eventType;
@Column(columnDefinition = "TEXT", nullable = false)
private String payload;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private OutboxStatus status;
@Column(nullable = false)
private Instant createdAt;
private Instant publishedAt;
}
public enum OutboxStatus {
PENDING, PUBLISHED, FAILED
}@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
public OrderService(OrderRepository orderRepository,
OutboxRepository outboxRepository,
ObjectMapper objectMapper) {
this.orderRepository = orderRepository;
this.outboxRepository = outboxRepository;
this.objectMapper = objectMapper;
}
public Order createOrder(CreateOrderRequest request) {
// 주문 생성
Order order = Order.builder()
.customerId(request.customerId())
.items(request.items())
.totalAmount(calculateTotal(request.items()))
.status(OrderStatus.CREATED)
.createdAt(Instant.now())
.build();
Order savedOrder = orderRepository.save(order);
// 동일 트랜잭션에서 outbox 이벤트 생성
OrderEvent event = OrderEvent.created(toPayload(savedOrder));
OutboxEvent outboxEvent = OutboxEvent.builder()
.eventId(event.eventId())
.aggregateType("Order")
.aggregateId(savedOrder.getId().toString())
.eventType(event.eventType())
.payload(serialize(event))
.status(OutboxStatus.PENDING)
.createdAt(Instant.now())
.build();
outboxRepository.save(outboxEvent);
return savedOrder;
}
private String serialize(OrderEvent event) {
try {
return objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
throw new SerializationException("Failed to serialize event", e);
}
}
}outbox 릴레이는 이벤트를 비동기적으로 Kafka에 게시합니다:
@Component
@Slf4j
public class OutboxRelayScheduler {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.kafka.topics.orders}")
private String ordersTopic;
public OutboxRelayScheduler(OutboxRepository outboxRepository,
KafkaTemplate<String, String> kafkaTemplate) {
this.outboxRepository = outboxRepository;
this.kafkaTemplate = kafkaTemplate;
}
@Scheduled(fixedDelay = 1000)
@Transactional
public void relayPendingEvents() {
// 잠금과 함께 대기 이벤트 조회
List<OutboxEvent> pendingEvents = outboxRepository
.findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);
for (OutboxEvent event : pendingEvents) {
try {
// Kafka에 게시
kafkaTemplate.send(
ordersTopic,
event.getAggregateId(),
event.getPayload()
).get(5, TimeUnit.SECONDS);
// 상태 업데이트
event.setStatus(OutboxStatus.PUBLISHED);
event.setPublishedAt(Instant.now());
outboxRepository.save(event);
log.debug("Outbox event published: {}", event.getEventId());
} catch (Exception ex) {
log.error("Failed to publish outbox event: {}",
event.getEventId(), ex);
event.setStatus(OutboxStatus.FAILED);
outboxRepository.save(event);
}
}
}
}이 패턴은 비즈니스 트랜잭션이 성공한 경우에만 이벤트가 게시되도록 보장하여 데이터베이스와 Kafka 사이의 불일치를 제거합니다.
연습을 시작하세요!
면접 시뮬레이터와 기술 테스트로 지식을 테스트하세요.
모니터링과 관측 가능성
Kafka 시스템을 감독하려면 컨슈밍 지연, 랙, 오류에 대한 메트릭이 필요합니다. Spring Boot Actuator는 이러한 메트릭을 Micrometer를 통해 노출합니다.
@Configuration
public class KafkaMetricsConfig {
@Bean
public MeterBinder kafkaConsumerMetrics(
ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
return registry -> {
// 사용자 정의 메트릭: 처리된 이벤트
Counter.builder("kafka.consumer.events.processed")
.description("Number of events successfully processed")
.tag("topic", "orders")
.register(registry);
// 사용자 정의 메트릭: 실패한 이벤트
Counter.builder("kafka.consumer.events.failed")
.description("Number of events that failed processing")
.tag("topic", "orders")
.register(registry);
};
}
}@Service
@Slf4j
public class InstrumentedOrderConsumer {
private final OrderProcessingService processingService;
private final MeterRegistry meterRegistry;
private final Counter processedCounter;
private final Counter failedCounter;
private final Timer processingTimer;
public InstrumentedOrderConsumer(OrderProcessingService processingService,
MeterRegistry meterRegistry) {
this.processingService = processingService;
this.meterRegistry = meterRegistry;
// 카운터 초기화
this.processedCounter = Counter.builder("kafka.consumer.events.processed")
.tag("topic", "orders")
.register(meterRegistry);
this.failedCounter = Counter.builder("kafka.consumer.events.failed")
.tag("topic", "orders")
.register(meterRegistry);
// 처리 지연 측정용 타이머
this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
.tag("topic", "orders")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
}
@KafkaListener(topics = "${app.kafka.topics.orders}")
public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
processingService.process(event);
acknowledgment.acknowledge();
processedCounter.increment();
sample.stop(processingTimer);
} catch (Exception ex) {
failedCounter.increment();
sample.stop(processingTimer);
throw ex;
}
}
}이러한 메트릭은 컨슈머 랙, 오류율, 처리 지연에 대한 알림을 구성할 수 있게 하며, 이는 사전 예방적 문제 감지에 필수적입니다.
결론
Spring Kafka는 회복탄력성을 갖춘 이벤트 기반 아키텍처 구축을 위한 견고한 통합을 제공합니다. 재시도 메커니즘, Dead Letter Queue, 멱등성을 마스터하는 것은 운영에 적합한 애플리케이션의 토대를 이룹니다.
Spring Kafka로 구축하는 이벤트 기반 아키텍처 체크리스트:
- ✅
AckMode.MANUAL로 수동 오프셋 커밋 구성 - ✅ 일관된 파티션 키를 사용하여 이벤트 순서 보존
- ✅
DefaultErrorHandler를 통해 지수 백오프 재시도 구현 - ✅ 실패한 메시지를 Dead Letter Queue로 라우팅
- ✅ eventId 추적을 통해 컨슈머 측 멱등성 보장
- ✅ 데이터베이스와 Kafka 사이의 일관성을 위해 Outbox 패턴 검토
- ✅ Micrometer를 통해 모니터링용 메트릭 노출
- ✅
MAX_POLL_INTERVAL_MS를 최대 처리 시간에 맞춰 조정
태그
공유
관련 기사

Spring Cloud Gateway 면접 대비: 라우팅, 필터, 로드 밸런싱
기술 면접을 위한 Spring Cloud Gateway 정복: 라우팅, 필터, 로드 밸런싱, API 게이트웨이 패턴을 다루는 12개의 질문과 코드 예제.

2026년 Spring Boot 로깅: Logback과 JSON으로 구현하는 운영 환경 구조화 로그
Spring Boot 구조화 로깅 완벽 가이드입니다. Logback JSON 설정, 추적용 MDC, 운영 환경 모범 사례, ELK Stack 연동을 다룹니다.

Spring GraphQL 면접: Resolver, DataLoader 및 N+1 문제 해결책
이 완전한 가이드로 Spring GraphQL 면접을 준비합니다. Resolver, DataLoader, N+1 문제 처리, mutation 및 기술 질문을 위한 모범 사례를 다룹니다.