データエンジニアのためのApache Kafka:ストリーミング、パーティション、面接対策

データエンジニア向けApache Kafkaの詳細解説。Kafka 4.xとKRaftを使ったストリーミングアーキテクチャ、パーティション戦略、コンシューマグループ、技術面接の頻出質問を実践的なコード例とともに紹介します。

Apache Kafkaのストリーミングアーキテクチャとパーティション構成図

Apache Kafkaは、現代のデータエンジニアリングスタックの中核に位置し、あらゆる規模の組織で毎日数兆件のイベントを処理しています。Kafka 4.xからはKRaftによる完全な自律動作が実現し(ZooKeeper依存の完全撤廃)、運用の複雑さが大幅に低減されました。リアルタイムデータパイプラインの業界標準としての信頼性はそのままに、よりシンプルなインフラストラクチャでの運用が可能になっています。

Kafka 4.x:KRaft専用モード

Apache Kafka 4.0以降、ZooKeeperは不要になりました。KRaft(Kafka Raft)がすべてのメタデータ管理をネイティブに処理し、運用の複雑さを削減するとともに、本番デプロイからインフラストラクチャコンポーネントを1つ完全に排除しました。

データパイプラインにおけるKafkaストリーミングアーキテクチャ

Kafkaは分散コミットログとして機能します。プロデューサーがトピックのパーティションの末尾にレコードを追記し、コンシューマがそれらのレコードを順序通りに読み取ります。この追記専用の設計により、リアルタイムストリーミングと任意のオフセットからのバッチリプレイの両方が可能となり、両方のパターンを必要とするデータエンジニアリングワークロードに最適な基盤を提供します。

Kafkaクラスタはブローカー(サーバー)、トピック(論理チャネル)、パーティション(トピックの物理シャード)で構成されます。各パーティションは、順序付けられた不変のレコードシーケンスです。1つのブローカーがパーティションリーダーとして機能し、すべての読み書きを処理します。フォロワーレプリカは耐障害性のためにコピーを維持します。

重要なアーキテクチャ上の特性として、順序保証はパーティション内でのみ有効であり、パーティション間では保証されません。この制約がデータパイプラインにおけるKafka設計の大部分を決定づけます。

yaml
# docker-compose.yml — Kafka 4.x KRaft cluster (no ZooKeeper)
services:
  kafka-1:
    image: apache/kafka:4.2.0
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_NUM_PARTITIONS: 6
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
    ports:
      - "9092:9092"

この設定は、コントローラーも兼ねるKRaftモードのブローカーを起動します。KAFKA_CONTROLLER_QUORUM_VOTERS設定は、以前ZooKeeperが管理していたリーダー選出、トピックメタデータ、クラスタメンバーシップの役割を引き継ぎます。

パイプライン性能を左右するパーティション戦略

パーティションは並列度、順序保証、スループットを決定します。適切なパーティション数とキー戦略の選択が、パイプラインの信頼性とコンシューマのスケーラビリティに直接影響します。

パーティション数の指針:

  • トピックを同時に処理するコンシューマの予想数から開始する
  • 各パーティションは最新のハードウェアで約10MB/sの書き込みスループットを維持できる
  • パーティション数が多いとエンドツーエンドのレイテンシがわずかに増加する(リーダー選出やファイルハンドルの増加)
  • Kafka 4.xでは、KRaftのメタデータ改善により、ブローカーあたり数千のパーティションを効率的に処理できる

パーティションキーの選択によって、どのレコードがどのパーティションに配置されるかが決まります。同じキーを持つレコードは常に同じパーティションに送られ、そのキーに対する順序が保持されます。

python
# partition_strategy.py — Choosing the right partition key
from confluent_kafka import Producer
import json

def create_producer():
    return Producer({
        'bootstrap.servers': 'kafka-1:9092',
        'acks': 'all',
        'enable.idempotence': True,
        'max.in.flight.requests.per.connection': 5
    })

def publish_order_event(producer, event):
    # Key by customer_id: all events for one customer stay ordered
    key = str(event['customer_id']).encode('utf-8')
    value = json.dumps(event).encode('utf-8')
    producer.produce(
        topic='order-events',
        key=key,
        value=value,
        headers=[('source', b'order-service'), ('version', b'2')]
    )
    producer.flush()

def publish_clickstream(producer, event):
    # Key by session_id: ordering within a session matters
    # NOT by user_id — one user may have multiple sessions
    key = str(event['session_id']).encode('utf-8')
    value = json.dumps(event).encode('utf-8')
    producer.produce(
        topic='clickstream',
        key=key,
        value=value
    )

customer_idsession_idのどちらをパーティションキーにするかは、異なる順序要件を反映しています。注文イベントはトランザクションの一貫性を維持するために顧客単位の順序が必要です。クリックストリームイベントはユーザージャーニーを正確に再構成するためにセッション単位の順序が必要です。

ホットパーティションのリスク

特定のキーが他のキーよりも圧倒的に多くのレコードを生成する場合(例:1社のエンタープライズ顧客がイベントの80%を生成するケース)、そのパーティションがボトルネックになります。パーティションラグのメトリクスを監視し、偏りのあるワークロードには複合キーやカスタムパーティショナーの採用を検討してください。

コンシューマグループとオフセット管理

コンシューマグループにより、トピックの並列処理が可能になります。Kafkaはグループ内の各パーティションを正確に1つのコンシューマに割り当てるため、最大並列度はパーティション数と等しくなります。

オフセット管理は、Exactly-once(正確に1回)とAt-least-once(少なくとも1回)のセマンティクスを制御します。Kafkaはコミットされたオフセットを内部の__consumer_offsetsトピックに保存します。処理に対するオフセットコミットのタイミングが配信保証を決定します。

python
# consumer_pipeline.py — Consumer group with manual offset management
from confluent_kafka import Consumer, KafkaError
import json

def create_consumer(group_id: str):
    return Consumer({
        'bootstrap.servers': 'kafka-1:9092',
        'group.id': group_id,
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False,  # Manual commit for at-least-once
        'max.poll.interval.ms': 300000,
        'session.timeout.ms': 45000,
        'isolation.level': 'read_committed'  # Only read committed transactions
    })

def process_batch(consumer: Consumer, batch_size: int = 500):
    """Process records in micro-batches for throughput."""
    consumer.subscribe(['order-events'])
    buffer = []

    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            raise Exception(msg.error())

        record = json.loads(msg.value().decode('utf-8'))
        buffer.append(record)

        if len(buffer) >= batch_size:
            # Process the full batch (write to warehouse, etc.)
            write_to_warehouse(buffer)
            # Commit AFTER successful processing
            consumer.commit(asynchronous=False)
            buffer.clear()

def write_to_warehouse(records: list):
    # Insert batch into data warehouse (BigQuery, Snowflake, etc.)
    pass

enable.auto.commitFalseに設定し、write_to_warehouseの成功後にのみコミットすることで、At-least-once配信が保証されます。コンシューマが処理後・コミット前にクラッシュした場合、再起動時にレコードが再配信されます。ウェアハウスに供給するデータパイプラインでは、これが通常正しいトレードオフです。ウェアハウスへの冪等な書き込みが重複を処理します。

データパイプライン統合のためのKafka Connect

Kafka Connectは、カスタムプロデューサー/コンシューマコードを記述せずに、Kafkaと外部システム間のデータ移動を実現するスケーラブルなフレームワークです。ソースコネクタがKafkaにデータを取り込み、シンクコネクタがデータを外部に送出します。

json
{
  "name": "postgres-cdc-source",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres-primary",
    "database.port": "5432",
    "database.user": "replication_user",
    "database.dbname": "production",
    "topic.prefix": "cdc",
    "table.include.list": "public.orders,public.customers,public.products",
    "slot.name": "debezium_slot",
    "plugin.name": "pgoutput",
    "publication.name": "dbz_publication",
    "snapshot.mode": "initial",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "cdc\\.public\\.(.*)",
    "transforms.route.replacement": "warehouse.$1"
  }
}

このDebezium CDCコネクタは、PostgreSQLテーブルからのすべてのINSERT、UPDATE、DELETEをキャプチャし、Kafkaイベントとして発行します。RegexRouterトランスフォームがトピック名をcdc.public.ordersからwarehouse.ordersにリネームし、パイプラインの名前空間を整理します。BigQueryやSnowflakeのシンクコネクタと組み合わせることで、カスタムコード不要の完全自動CDCパイプラインが構築できます。

Data Engineeringの面接対策はできていますか?

インタラクティブなシミュレーター、flashcards、技術テストで練習しましょう。

Kafkaパイプラインにおける Exactly-Onceセマンティクス

Kafkaは冪等プロデューサーとトランザクションAPIを通じて、Exactly-onceセマンティクス(EOS)をサポートしています。データエンジニアリングパイプラインにおいて、EOSは重複排除ロジックを必要とせずにダウンストリームシステムでの重複レコードを防止します。

EOSを実現する3つのコンポーネント:

  1. 冪等プロデューサー: 各プロデューサーは一意のProducer IDを取得します。ブローカーはシーケンス番号を使用してリトライの重複を排除するため、ネットワークリトライが重複を生むことはありません。
  2. トランザクション: プロデューサーは複数のパーティションへの書き込みとオフセットのコミットを単一のトランザクションでアトミックに実行できます。すべての書き込みが成功するか、すべてが失敗するかのいずれかです。
  3. read_committed分離レベル: isolation.level=read_committedで設定されたコンシューマは、コミット済みトランザクションのレコードのみを参照します。
python
# exactly_once_pipeline.py — Transactional consume-transform-produce
from confluent_kafka import Consumer, Producer
import json

def transactional_pipeline():
    consumer = Consumer({
        'bootstrap.servers': 'kafka-1:9092',
        'group.id': 'etl-transformer',
        'enable.auto.commit': False,
        'isolation.level': 'read_committed'
    })

    producer = Producer({
        'bootstrap.servers': 'kafka-1:9092',
        'transactional.id': 'etl-transformer-001',
        'acks': 'all',
        'enable.idempotence': True
    })

    producer.init_transactions()
    consumer.subscribe(['raw-events'])

    while True:
        msg = consumer.poll(1.0)
        if msg is None or msg.error():
            continue

        # Begin atomic transaction
        producer.begin_transaction()
        try:
            raw = json.loads(msg.value())
            enriched = transform(raw)

            # Write transformed record to output topic
            producer.produce(
                'enriched-events',
                key=msg.key(),
                value=json.dumps(enriched).encode('utf-8')
            )

            # Commit consumer offset within the same transaction
            producer.send_offsets_to_transaction(
                consumer.position(consumer.assignment()),
                consumer.consumer_group_metadata()
            )
            producer.commit_transaction()
        except Exception:
            producer.abort_transaction()

def transform(record: dict) -> dict:
    # Apply business logic transformations
    record['processed_at'] = '2026-04-20T00:00:00Z'
    record['pipeline_version'] = '2.1'
    return record

transactional.id設定により、ブローカーはゾンビプロデューサーを遮断できます。コンシューマがクラッシュし、同じtransactional.idで新しいインスタンスが起動した場合、ブローカーは旧インスタンスの保留中のトランザクションをアボートし、出力の重複を防止します。

Share Groups:Kafka 4.2のキューセマンティクス

Kafka 4.2では、本番対応のShare Groupsが導入され、Kafkaに従来のメッセージキューセマンティクスがもたらされました。各パーティションがグループ内の1つのコンシューマにのみ割り当てられるコンシューマグループとは異なり、Share Groupsでは複数のコンシューマが同じパーティションのレコードを独立して処理できます。

| 特性 | コンシューマグループ | Share Groups | |---|---|---| | パーティション割り当て | 排他的(パーティションごとに1コンシューマ) | 共有(パーティションごとに複数コンシューマ) | | 順序保証 | パーティション内の順序を保持 | 順序保証なし | | ユースケース | 順序付きイベントストリーム | タスク分配、ワークキュー | | 確認応答 | オフセットベース(位置のコミット) | レコード単位(個別に確認応答) | | 最大並列度 | パーティション数で制限 | コンシューマ数で制限 |

Share Groupsは長年の課題であった、パーティション数を超えたコンシューマのスケーリングを解決します。通知配信やバッチジョブ配布など、順序が不要なワークロードでは、Kafka以外の外部キューシステムが不要になります。

Share Groupsの適用場面

Share Groupsはタスク分配型のワークロードに適しています:メール送信、アップロード処理、ML推論の実行など。イベントソーシング、CDC、または順序付き処理を必要とするパイプラインでは、コンシューマグループが引き続き正しい選択です。

データエンジニア向けKafka面接質問

データエンジニアリング職の技術面接では、Kafkaに関する知識が頻繁に問われます。以下の質問は、最も一般的に評価されるコンセプトをカバーしています。

Q: Kafkaはどのようにメッセージの順序を保証しますか? 順序は単一パーティション内でのみ保証されます。同じパーティションキーを持つすべてのレコードは同じパーティションに送られ、順序通りに追記されます。パーティション間では順序は存在しません。適切なパーティションキーの選択が、Kafkaパイプラインにおける順序制御の主要なメカニズムです。

Q: グループ内のコンシューマが障害を起こした場合、何が起きますか? グループコーディネーターがハートビートの欠落により障害を検出します(session.timeout.msで制御)。リバランスがトリガーされ、障害のあったコンシューマのパーティションが残りのグループメンバーに再割り当てされます。リバランス中、消費は一時的に停止します。Kafka 4.xでデフォルトの協調的スティッキーリバランシングにより、影響を受けるパーティションのみが再割り当てされ、中断が最小限に抑えられます。

Q: acks=1acks=allの違いを説明してください。 acks=1では、リーダーレプリカがレコードを永続化した後にブローカーが書き込みを確認応答します。acks=allでは、すべてのIn-Syncレプリカ(ISR)が書き込みを確認するまで待機します。acks=allmin.insync.replicas=2と組み合わせることで、単一ブローカーの障害時にもデータ損失がないことを保証します。レイテンシはわずかに増加します。

Q: Kafkaを使用したCDCパイプラインをどのように設計しますか? Debezium(PostgreSQL、MySQL用)またはネイティブCDCコネクタを使用してソースデータベースの変更をキャプチャします。変更イベントをプライマリキーでパーティション分割されたKafkaトピックに発行します。シンクコネクタ(BigQuery Sink、S3 Sink)を使用して、変更を分析用ウェアハウスにロードします。コンシューマにisolation.level=read_committedを設定し、未コミットのデータベーストランザクションの読み取りを回避します。変更イベントのAvroまたはProtobufスキーマの管理にはスキーマレジストリを使用します。

Q: ISRとは何で、なぜ重要ですか? ISR(In-Sync Replicas)は、パーティションリーダーに完全に同期しているレプリカのセットです。リーダー選出の対象はISRメンバーのみです。min.insync.replicas設定は、書き込みがコミット済みとみなされるために確認応答が必要なレプリカ数を定義します。ISRがmin.insync.replicasを下回ると、ブローカーはデータ損失のリスクを回避するために書き込みを拒否します。

データエンジニアリングの面接対策については、データエンジニアリング面接質問集ETL/ELTパイプラインパターンデータモデリングなどの幅広いトピックを取り扱っています。

今すぐ練習を始めましょう!

面接シミュレーターと技術テストで知識をテストしましょう。

まとめ

  • Kafka 4.xのKRaftにより、ZooKeeperが完全に排除され、データエンジニアリングチームの運用オーバーヘッドが削減されます
  • パーティションキーの選択が順序保証を決定します。利便性ではなく、ダウンストリームの処理要件に基づいてキーを選択する必要があります
  • enable.auto.commit=Falseによる手動オフセットコミットでAt-least-once配信が可能になり、トランザクションAPIはconsume-transform-produceパイプラインにExactly-onceセマンティクスを提供します
  • Kafka Connect + Debeziumにより、カスタムコンシューマコード不要で本番対応のCDCが実現します
  • Share Groups(Kafka 4.2)は、順序が不要なタスク分配ワークロードにキューセマンティクスを追加します
  • コンシューマグループのスケーリングはパーティション数に制約されるため、ピーク時のコンシューマ並列度に基づいてパーティション数を計画してください

今すぐ練習を始めましょう!

面接シミュレーターと技術テストで知識をテストしましょう。

タグ

#kafka
#streaming
#data-engineering
#partitions
#consumer-groups
#kraft
#event-driven

共有

関連記事