Spring Kafka: 耐障害性のあるコンシューマによるイベント駆動アーキテクチャ

イベント駆動アーキテクチャ向けの完全なSpring Kafkaガイド。設定、耐障害性のあるコンシューマ、リトライポリシー、Dead Letter Queue、分散アプリケーション向けの本番パターン。

Spring Kafkaと耐障害性のあるコンシューマによるイベント駆動アーキテクチャ

Apache Kafkaは大規模なイベント駆動アーキテクチャの事実上の標準となっています。Spring Kafkaは、Spring Bootアプリケーションへの統合を簡素化しつつ、本番環境に不可欠な耐障害性メカニズムを提供します。本ガイドでは、設定、コンシューマパターン、エラー処理戦略を詳しく解説します。

前提条件

本ガイドは、Kafkaの基本概念であるトピック、パーティション、コンシューマグループ、オフセットへの理解を前提としています。重点は、Springとの統合と耐障害性パターンに置かれています。

なぜイベント駆動アーキテクチャを選ぶのか

イベント駆動アーキテクチャは、非同期イベントを介してシステムコンポーネントを疎結合にします。同期的なREST呼び出しとは異なり、プロデューサは応答を待たずにイベントを発行し、コンシューマは自分のペースで処理を進められます。

このアプローチは、サービスごとの独立した水平スケーラビリティ、一時的な障害に対する耐性の向上、そしてKafkaの不変ログによる完全なトレーサビリティといった重要な利点をもたらします。

OrderEvent.javajava
public record OrderEvent(
    // 冪等性のための一意なイベントID
    String eventId,
    // ルーティング用のイベントタイプ
    String eventType,
    // 作成タイムスタンプ
    Instant createdAt,
    // 業務ペイロード
    OrderPayload payload
) {
    // eventIdの一意性を保証するファクトリメソッド
    public static OrderEvent created(OrderPayload payload) {
        return new OrderEvent(
            UUID.randomUUID().toString(),
            "ORDER_CREATED",
            Instant.now(),
            payload
        );
    }
}

public record OrderPayload(
    Long orderId,
    Long customerId,
    List<OrderItem> items,
    BigDecimal totalAmount
) {}

イベント構造には、一意なID、タイプ、タイムスタンプが必ず含まれます。これらのメタデータにより、コンシューマ側でのフィルタリング、重複検出、時系列での追跡が可能になります。

Spring Kafkaの基本設定

統合はspring-kafkaスターターと最小限のYAML設定から始まります。Spring Bootは、生成側のKafkaTemplateと消費側のConcurrentKafkaListenerContainerFactoryという主要なBeanを自動構成します。

yaml
# application.yml
spring:
  kafka:
    # Kafkaブローカーのアドレス(クラスター)
    bootstrap-servers: localhost:9092

    # プロデューサ設定
    producer:
      # 文字列キーのシリアライゼーション
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # JSON値のシリアライゼーション
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # すべてのレプリカからの確認を待機
      acks: all
      # ネットワーク障害時の再試行回数
      retries: 3

    # コンシューマ設定
    consumer:
      # コンシューマグループの識別子
      group-id: order-service
      # キーのデシリアライゼーション
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # ターゲット型を指定したJSONデシリアライゼーション
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # 記録済みオフセットがない場合の開始位置
      auto-offset-reset: earliest
      # 手動制御のために自動コミットを無効化
      enable-auto-commit: false
      properties:
        # デシリアライゼーションのための信頼パッケージ
        spring.json.trusted.packages: com.example.events

enable-auto-commitの無効化は本番環境で欠かせない実践です。手動でのオフセットコミットにより、メッセージは実際の処理が完了したあとにのみ処理済みとしてマークされます。

Kafkaプロデューサの作成

KafkaTemplateはKafkaへの送信ロジックをカプセル化します。直接インジェクションすることで、業務サービス内ですぐに利用できます。

OrderEventPublisher.javajava
@Service
@Slf4j
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    // 宛先トピック(設定で外部化)
    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public CompletableFuture<SendResult<String, OrderEvent>> publishOrderCreated(
            OrderPayload payload) {

        // メタデータ付きでイベントを作成
        OrderEvent event = OrderEvent.created(payload);

        // パーティションキーとしてorderIdを使用
        // 同一注文のイベント順序を保証
        String partitionKey = payload.orderId().toString();

        log.info("Publishing ORDER_CREATED event: {} to topic: {}",
            event.eventId(), ordersTopic);

        // コールバック付きの非同期送信
        return kafkaTemplate.send(ordersTopic, partitionKey, event)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    // 成功: 送信メタデータをログ出力
                    RecordMetadata metadata = result.getRecordMetadata();
                    log.info("Event sent successfully: topic={}, partition={}, offset={}",
                        metadata.topic(), metadata.partition(), metadata.offset());
                } else {
                    // 失敗: 調査用にエラーをログ出力
                    log.error("Failed to send event: {}", event.eventId(), ex);
                }
            });
    }
}

業務識別子に基づくパーティションキーを使うと、同じエンティティに関するすべてのイベントが同じパーティションに配置され、時系列順が保たれます。

パーティションキー

キーがnullの場合、メッセージはパーティション間でラウンドロビン分散されます。並列性は最大化されますが、順序保証は失われます。キーの選択は業務要件に依存します。

@KafkaListenerによる基本コンシューマ

@KafkaListenerアノテーションはメソッドをKafkaコンシューマへ変換します。Springはポーリングループ、デシリアライゼーション、オフセットコミットを自動的に処理します。

OrderEventConsumer.javajava
@Service
@Slf4j
public class OrderEventConsumer {

    private final OrderProcessingService processingService;

    public OrderEventConsumer(OrderProcessingService processingService) {
        this.processingService = processingService;
    }

    @KafkaListener(
        // 受信対象のトピック
        topics = "${app.kafka.topics.orders}",
        // コンシューマグループ
        groupId = "${spring.kafka.consumer.group-id}",
        // 拡張設定向けのカスタムファクトリ
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            // 自動デシリアライズされたペイロード
            OrderEvent event,
            // 注入されたKafkaメタデータ
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            // 手動コミット用のAcknowledgment
            Acknowledgment acknowledgment) {

        log.info("Received event: type={}, partition={}, offset={}",
            event.eventType(), partition, offset);

        try {
            // 業務処理
            processingService.process(event);

            // 処理が成功したあとにのみオフセットをコミット
            acknowledgment.acknowledge();

            log.info("Event processed successfully: {}", event.eventId());

        } catch (Exception ex) {
            // acknowledge()を呼ばないと再処理が発生
            log.error("Failed to process event: {}", event.eventId(), ex);
            throw ex;
        }
    }
}

Acknowledgmentを注入することで、コミットを明示的に制御できます。acknowledge()を呼び出さない場合、オフセットは未コミットのままで、次のポーリング時にメッセージが再配信されます。

ConsumerFactoryの高度な設定

デフォルト設定は開発には適していますが、本番環境ではチューニングが必要です。カスタムファクトリにより、コンシューマの動作を細かく制御できます。

KafkaConsumerConfig.javajava
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        // 接続設定
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // デシリアライゼーション
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);

        // オフセット管理
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // パフォーマンス: ポーリングごとのレコード数
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        // 障害検出のためのセッションタイムアウト
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        // ハートビート間隔(セッションタイムアウトの1/3)
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

        // リバランス前の最大処理時間
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

        // JSONデシリアライザの設定
        JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
        deserializer.addTrustedPackages("com.example.events");
        deserializer.setUseTypeMapperForKey(false);

        return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        // 手動Acknowledgmentモード
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // コンシューマスレッド数
        factory.setConcurrency(3);

        // バッチ処理は無効化(一度に1メッセージ)
        factory.setBatchListener(false);

        return factory;
    }
}

MAX_POLL_INTERVAL_MS_CONFIGパラメータはpoll()呼び出し間の最大遅延を定義します。これを超えると、コンシューマはグループから除外されリバランスが発生します。値は想定される最大処理時間を反映するべきです。

Spring Bootの面接対策はできていますか?

インタラクティブなシミュレーター、flashcards、技術テストで練習しましょう。

RetryTemplateによる再試行戦略

一時的なエラー(サービスの短時間の利用不能、ネットワークタイムアウト)は自動的な再試行を必要とします。Spring KafkaはRetryTemplateと統合し、洗練された再試行ポリシーを実装できます。

KafkaRetryConfig.javajava
@Configuration
public class KafkaRetryConfig {

    @Bean
    public RetryTemplate kafkaRetryTemplate() {
        RetryTemplate template = new RetryTemplate();

        // 再試行ポリシー: 最大3回
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        template.setRetryPolicy(retryPolicy);

        // 指数バックオフ: 1秒, 2秒, 4秒
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        template.setBackOffPolicy(backOffPolicy);

        return template;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            retryableKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                RetryTemplate kafkaRetryTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // リカバリコールバック付きの再試行設定
        factory.setCommonErrorHandler(
            new DefaultErrorHandler(
                // リカバリ: 試行回数を使い切ったあとの処理
                (record, exception) -> {
                    log.error("All retries exhausted for record: key={}, value={}",
                        record.key(), record.value(), exception);
                },
                // 指数バックオフ: 初期1秒、最大30秒、3回試行
                new ExponentialBackOff(1000L, 2.0)
            )
        );

        return factory;
    }
}

DefaultErrorHandlerはSpring Kafka 2.8以降、従来のSeekToCurrentErrorHandlerを置き換えます。よりわかりやすいAPIと拡張された設定オプションを備えています。

Dead Letter Queueの実装

再試行が尽きたあと、失敗したメッセージは後で分析するためにDead Letter Queue(DLQ)へルーティングするべきです。このアプローチはデータの損失を防ぎ、コンシューマのブロックを解除します。

DeadLetterConfig.javajava
@Configuration
public class DeadLetterConfig {

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // DLTトピックの命名戦略: original-topic.DLT
        return new DeadLetterPublishingRecoverer(
            kafkaTemplate,
            (record, exception) -> {
                // 元トピックに基づくDLTトピック
                String dltTopic = record.topic() + ".DLT";

                log.warn("Sending failed record to DLT: topic={}, key={}, error={}",
                    dltTopic, record.key(), exception.getMessage());

                return new TopicPartition(dltTopic, record.partition());
            }
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
            dltKafkaListenerContainerFactory(
                ConsumerFactory<String, OrderEvent> consumerFactory,
                DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL);

        // DLTリカバリ付きのErrorHandler
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            deadLetterPublishingRecoverer,
            new FixedBackOff(1000L, 3)
        );

        // 再試行不可な例外(直接DLTへ送信)
        errorHandler.addNotRetryableExceptions(
            ValidationException.class,
            JsonParseException.class,
            NullPointerException.class
        );

        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }
}

再試行可能と再試行不可の例外を区別すると挙動が最適化されます。ValidationExceptionは再試行では解消しない不正データを示し、直接DLTへルーティングする根拠となります。

手動再処理用のDLTコンシューマ

専用のコンシューマがDLTを監視し、根本原因を解消したあとにメッセージを再処理できるようにします。

DeadLetterConsumer.javajava
@Service
@Slf4j
public class DeadLetterConsumer {

    private final AlertingService alertingService;
    private final FailedEventRepository failedEventRepository;

    public DeadLetterConsumer(AlertingService alertingService,
                              FailedEventRepository failedEventRepository) {
        this.alertingService = alertingService;
        this.failedEventRepository = failedEventRepository;
    }

    @KafkaListener(
        topics = "${app.kafka.topics.orders}.DLT",
        groupId = "order-service-dlt"
    )
    public void handleDeadLetter(
            ConsumerRecord<String, OrderEvent> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage,
            @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition,
            @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
            Acknowledgment acknowledgment) {

        log.error("Dead letter received: topic={}, partition={}, offset={}, error={}",
            originalTopic, originalPartition, originalOffset, exceptionMessage);

        // 分析と後の再処理のための永続化
        FailedEvent failedEvent = FailedEvent.builder()
            .eventId(record.value().eventId())
            .eventType(record.value().eventType())
            .payload(serializePayload(record.value()))
            .originalTopic(originalTopic)
            .originalPartition(originalPartition)
            .originalOffset(originalOffset)
            .exceptionMessage(exceptionMessage)
            .stacktrace(stacktrace)
            .status(FailedEventStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        failedEventRepository.save(failedEvent);

        // 人手による対応のためのアラート
        alertingService.notifyDeadLetter(failedEvent);

        acknowledgment.acknowledge();
    }
}
DLTヘッダー

Spring KafkaはDLTメッセージに、障害メタデータを含むヘッダー(例外、スタックトレース、元トピック、パーティション、オフセット)を自動的に付加します。これらの情報により診断が容易になります。

コンシューマ側での冪等性管理

Kafkaは「at least once」の配信を保証します。処理後コミット前にクラッシュが発生した場合、メッセージは複数回配信される可能性があります。コンシューマ側の冪等性により、再処理による副作用を防げます。

IdempotentOrderProcessor.javajava
@Service
@Slf4j
public class IdempotentOrderProcessor {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderService orderService;

    public IdempotentOrderProcessor(ProcessedEventRepository processedEventRepository,
                                    OrderService orderService) {
        this.processedEventRepository = processedEventRepository;
        this.orderService = orderService;
    }

    @Transactional
    public void processIdempotent(OrderEvent event) {
        String eventId = event.eventId();

        // 確認: イベントは既に処理されているか?
        if (processedEventRepository.existsByEventId(eventId)) {
            log.info("Event already processed, skipping: {}", eventId);
            return;
        }

        // 業務処理
        switch (event.eventType()) {
            case "ORDER_CREATED" -> orderService.createOrder(event.payload());
            case "ORDER_UPDATED" -> orderService.updateOrder(event.payload());
            case "ORDER_CANCELLED" -> orderService.cancelOrder(event.payload());
            default -> log.warn("Unknown event type: {}", event.eventType());
        }

        // 同一トランザクション内で処理を記録
        ProcessedEvent processed = ProcessedEvent.builder()
            .eventId(eventId)
            .eventType(event.eventType())
            .processedAt(Instant.now())
            .build();

        processedEventRepository.save(processed);

        log.info("Event processed and recorded: {}", eventId);
    }
}
ProcessedEventRepository.javajava
@Repository
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, Long> {

    // 最適なパフォーマンスのためのeventIdへのインデックス
    boolean existsByEventId(String eventId);

    // 古いレコードの削除
    @Modifying
    @Query("DELETE FROM ProcessedEvent e WHERE e.processedAt < :before")
    int deleteOlderThan(@Param("before") Instant before);
}

業務ロジックと同じトランザクションで処理記録を行うことで、原子性が保証されます。処理後コミット前にクラッシュしても、両方の操作が一貫して再実行されます。

Transactional Outboxパターン

Outboxパターンはデータベースとkafkaの間の整合性問題を解決します。Kafkaへ直接発行する代わりに、イベントは「outbox」テーブルに永続化され、専用プロセスによって中継されます。

OutboxEvent.javajava
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String eventId;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String eventType;

    @Column(columnDefinition = "TEXT", nullable = false)
    private String payload;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private OutboxStatus status;

    @Column(nullable = false)
    private Instant createdAt;

    private Instant publishedAt;
}

public enum OutboxStatus {
    PENDING, PUBLISHED, FAILED
}
OrderService.javajava
@Service
@Transactional
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    public OrderService(OrderRepository orderRepository,
                       OutboxRepository outboxRepository,
                       ObjectMapper objectMapper) {
        this.orderRepository = orderRepository;
        this.outboxRepository = outboxRepository;
        this.objectMapper = objectMapper;
    }

    public Order createOrder(CreateOrderRequest request) {
        // 注文を作成
        Order order = Order.builder()
            .customerId(request.customerId())
            .items(request.items())
            .totalAmount(calculateTotal(request.items()))
            .status(OrderStatus.CREATED)
            .createdAt(Instant.now())
            .build();

        Order savedOrder = orderRepository.save(order);

        // 同一トランザクション内でoutboxイベントを作成
        OrderEvent event = OrderEvent.created(toPayload(savedOrder));

        OutboxEvent outboxEvent = OutboxEvent.builder()
            .eventId(event.eventId())
            .aggregateType("Order")
            .aggregateId(savedOrder.getId().toString())
            .eventType(event.eventType())
            .payload(serialize(event))
            .status(OutboxStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        outboxRepository.save(outboxEvent);

        return savedOrder;
    }

    private String serialize(OrderEvent event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Failed to serialize event", e);
        }
    }
}

outboxリレイヤーは非同期にイベントをKafkaへ発行します:

OutboxRelayScheduler.javajava
@Component
@Slf4j
public class OutboxRelayScheduler {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.kafka.topics.orders}")
    private String ordersTopic;

    public OutboxRelayScheduler(OutboxRepository outboxRepository,
                               KafkaTemplate<String, String> kafkaTemplate) {
        this.outboxRepository = outboxRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void relayPendingEvents() {
        // ロックを伴って未送信イベントを取得
        List<OutboxEvent> pendingEvents = outboxRepository
            .findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);

        for (OutboxEvent event : pendingEvents) {
            try {
                // Kafkaへ発行
                kafkaTemplate.send(
                    ordersTopic,
                    event.getAggregateId(),
                    event.getPayload()
                ).get(5, TimeUnit.SECONDS);

                // 状態を更新
                event.setStatus(OutboxStatus.PUBLISHED);
                event.setPublishedAt(Instant.now());
                outboxRepository.save(event);

                log.debug("Outbox event published: {}", event.getEventId());

            } catch (Exception ex) {
                log.error("Failed to publish outbox event: {}",
                    event.getEventId(), ex);
                event.setStatus(OutboxStatus.FAILED);
                outboxRepository.save(event);
            }
        }
    }
}

このパターンにより、業務トランザクションが成功した場合にのみイベントが発行されることが保証され、データベースとKafka間の不整合が解消されます。

今すぐ練習を始めましょう!

面接シミュレーターと技術テストで知識をテストしましょう。

モニタリングと可観測性

Kafkaシステムを監督するには、消費レイテンシ、ラグ、エラーに関するメトリクスが必要です。Spring Boot ActuatorはこれらをMicrometer経由で公開します。

KafkaMetricsConfig.javajava
@Configuration
public class KafkaMetricsConfig {

    @Bean
    public MeterBinder kafkaConsumerMetrics(
            ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        return registry -> {
            // カスタムメトリクス: 処理済みイベント
            Counter.builder("kafka.consumer.events.processed")
                .description("Number of events successfully processed")
                .tag("topic", "orders")
                .register(registry);

            // カスタムメトリクス: 失敗したイベント
            Counter.builder("kafka.consumer.events.failed")
                .description("Number of events that failed processing")
                .tag("topic", "orders")
                .register(registry);
        };
    }
}
InstrumentedOrderConsumer.javajava
@Service
@Slf4j
public class InstrumentedOrderConsumer {

    private final OrderProcessingService processingService;
    private final MeterRegistry meterRegistry;
    private final Counter processedCounter;
    private final Counter failedCounter;
    private final Timer processingTimer;

    public InstrumentedOrderConsumer(OrderProcessingService processingService,
                                     MeterRegistry meterRegistry) {
        this.processingService = processingService;
        this.meterRegistry = meterRegistry;

        // カウンターの初期化
        this.processedCounter = Counter.builder("kafka.consumer.events.processed")
            .tag("topic", "orders")
            .register(meterRegistry);

        this.failedCounter = Counter.builder("kafka.consumer.events.failed")
            .tag("topic", "orders")
            .register(meterRegistry);

        // 処理レイテンシ計測のタイマー
        this.processingTimer = Timer.builder("kafka.consumer.processing.duration")
            .tag("topic", "orders")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(meterRegistry);
    }

    @KafkaListener(topics = "${app.kafka.topics.orders}")
    public void handleOrderEvent(OrderEvent event, Acknowledgment acknowledgment) {

        Timer.Sample sample = Timer.start(meterRegistry);

        try {
            processingService.process(event);
            acknowledgment.acknowledge();

            processedCounter.increment();
            sample.stop(processingTimer);

        } catch (Exception ex) {
            failedCounter.increment();
            sample.stop(processingTimer);
            throw ex;
        }
    }
}

これらのメトリクスは、コンシューマのラグ、エラー率、処理レイテンシに関するアラートの設定を可能にし、問題の予防的検知に欠かせません。

まとめ

Spring Kafkaは、耐障害性の高いイベント駆動アーキテクチャを構築するための堅牢な統合を提供します。再試行メカニズム、Dead Letter Queue、冪等性の習得は、本番運用に耐えるアプリケーションの基盤となります。

Spring Kafkaによるイベント駆動アーキテクチャのチェックリスト:

  • AckMode.MANUALで手動オフセットコミットを設定する
  • ✅ 一貫したパーティションキーを用いてイベント順序を保つ
  • DefaultErrorHandlerで指数バックオフ付きの再試行を実装する
  • ✅ 失敗したメッセージをDead Letter Queueへルーティングする
  • ✅ eventIdの追跡によりコンシューマ側の冪等性を保証する
  • ✅ データベースとKafkaの整合性のためにOutboxパターンを検討する
  • ✅ Micrometer経由でメトリクスを公開しモニタリングに活用する
  • MAX_POLL_INTERVAL_MSを最大処理時間に合わせて調整する

タグ

#spring kafka
#event-driven
#kafka consumer
#microservices
#resilience

共有

関連記事