Spring Batch 5 面接対策: パーティショニング・チャンク・フォールトトレランス
Spring Batch 5 の面接を制覇する15の必須質問。パーティショニング、チャンク処理、フォールトトレランスを Java 21 のコード例と共に解説します。

Spring Batch 5 は、Spring エコシステムにおける大規模データ処理の中核を担います。技術面接では、堅牢でスケーラブルかつフォールトトレラントなジョブを設計する能力が評価されます。パーティショニング、チャンク指向処理、フォールトトレランス機構の習熟が、シニア開発者を際立たせます。
採用担当者は深い理解を確認します。なぜ remote chunking ではなく partitioning を選ぶのか。チャンクサイズはどのように見積もるのか。こうしたアーキテクチャ判断には、本番運用の経験がにじみ出ます。
Spring Batch 5 の基本アーキテクチャ
質問 1: Spring Batch の主な構成要素は何ですか
Spring Batch のアーキテクチャは三層からなります。アプリケーション(ジョブとビジネスコード)、Batch Core(ジョブを起動・制御するランタイムクラス)、インフラストラクチャ(reader、writer、RetryTemplate などの共通サービス)です。
// Java 21 を用いた Spring Batch 5 のジョブ設定
@Configuration
public class BatchJobConfig {
// JobRepository は実行メタデータを保存します
// ジョブのリスタートと監視を可能にします
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public BatchJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Job はバッチプロセス全体をカプセル化します
// 一つ以上の Step が順序付きで実行されます
@Bean
public Job importUserJob(Step processUsersStep, Step cleanupStep) {
return new JobBuilder("importUserJob", jobRepository)
.start(processUsersStep) // メイン処理ステップ
.next(cleanupStep) // クリーンアップステップ
.build();
}
// Step は独立した作業単位を表します
// 二つのモデル: Tasklet(単一タスク)または Chunk(反復処理)
@Bean
public Step processUsersStep(ItemReader<UserRecord> reader,
ItemProcessor<UserRecord, User> processor,
ItemWriter<User> writer) {
return new StepBuilder("processUsersStep", jobRepository)
.<UserRecord, User>chunk(100, transactionManager) // 100件ごとにコミット
.reader(reader) // 元データを読み取り
.processor(processor) // 各アイテムを変換
.writer(writer) // 100件単位で書き込み
.build();
}
}JobRepository は実行状態をデータベースに永続化します。この永続化により、失敗したジョブを停止地点から再開でき、コミット済みデータを再処理する必要がありません。
質問 2: Tasklet とチャンク指向処理の違いは何ですか
Tasklet は離散的で反復しないアクション(ファイル削除、ストアドプロシージャ呼び出し、通知メール送信など)を実行します。Chunk はデータを管理可能なバッチに分割して大量のデータを処理します。
// Tasklet: 反復のない単一アクション
@Component
public class CleanupTasklet implements Tasklet {
private final Path tempDirectory = Path.of("/tmp/batch-work");
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// 処理に伴う一時ファイルをすべて削除
try (var files = Files.walk(tempDirectory)) {
files.filter(Files::isRegularFile)
.forEach(this::deleteQuietly);
}
// FINISHED は tasklet が処理を完了したことを示します
// CONTINUABLE を返すと再実行されます(ポーリング向け)
return RepeatStatus.FINISHED;
}
private void deleteQuietly(Path file) {
try {
Files.delete(file);
} catch (IOException e) {
// ログ出力して継続 - 1ファイルでジョブを失敗させない
}
}
}// チャンク処理: 大量処理向け
@Configuration
public class ChunkProcessingConfig {
@Bean
public Step processOrdersStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("processOrdersStep", jobRepository)
// 500件のチャンク: 500件読み込み・処理・書き込み・コミット
.<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// 進捗監視のリスナー
.listener(new ChunkProgressListener())
.build();
}
}チャンク指向処理には大きな利点があります。最適化されたメモリ管理(現在のチャンクのみ保持)、粒度の細かいトランザクション(チャンクごとにコミット)、最終コミット済みチャンクからの障害回復です。
チャンク指向処理の詳細
質問 3: チャンクのライフサイクルはどのように動作しますか
各チャンクは厳密なサイクルに従います。設定サイズに達するまでアイテムを一件ずつ読み取り、各アイテムを個別に処理し、最後にグループとして書き込みます。トランザクションがチャンク全体を包みます。
// ItemReader: 一件ずつ読み取ります
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {
// @StepScope: Step 実行ごとに新しいインスタンス
// 動的なジョブパラメーターを注入可能にします
@Value("#{jobParameters['startDate']}")
private LocalDate startDate;
private Iterator<OrderRecord> orderIterator;
@BeforeStep
public void initializeReader(StepExecution stepExecution) {
// Step 開始時にデータをロード
List<OrderRecord> orders = fetchOrdersFromDate(startDate);
this.orderIterator = orders.iterator();
}
@Override
public OrderRecord read() {
// データ終端を示すため null を返す
// Spring Batch は null を受け取るまで read() を呼び続けます
if (orderIterator.hasNext()) {
return orderIterator.next();
}
return null; // データセットの終端
}
private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
// データソースから取得
return List.of(); // 実際の実装
}
}// ItemProcessor: 各アイテムを個別に変換
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {
private final PricingService pricingService;
private final ValidationService validationService;
public OrderItemProcessor(PricingService pricingService,
ValidationService validationService) {
this.pricingService = pricingService;
this.validationService = validationService;
}
@Override
public ProcessedOrder process(OrderRecord item) {
// null を返すとアイテムをフィルタ(書き込まれない)
if (!validationService.isValid(item)) {
return null; // アイテムを除外
}
// ビジネス変換
BigDecimal finalPrice = pricingService.calculatePrice(item);
return new ProcessedOrder(
item.orderId(),
item.customerId(),
finalPrice,
LocalDateTime.now()
);
}
}// ItemWriter: チャンク全体を一回の操作で書き込み
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
public OrderItemWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// チャンクは処理済みの全アイテムを保持
// 最適なパフォーマンスのためバッチ書き込み
List<? extends ProcessedOrder> items = chunk.getItems();
jdbcTemplate.batchUpdate(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
items,
items.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
}
);
}
}チャンク処理中に例外が発生するとトランザクションはロールバックされます。その後、JobRepository に保存されたメタデータを使ってそのチャンクから再開できます。
質問 4: 最適なチャンクサイズの選び方は
チャンクサイズはパフォーマンスとメモリ消費に直結します。小さすぎるとコミット回数が増え(オーバーヘッド)、大きすぎるとメモリを過剰に消費し障害時のロールバックが長引きます。
// チャンクサイズの動的設定
@Configuration
public class ChunkSizingConfig {
// 多くの場合に妥当なデフォルト値
private static final int DEFAULT_CHUNK_SIZE = 100;
// 軽量アイテム(フィールドが少ない)向け
private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;
// 重量アイテム(blob、ドキュメント)向け
private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;
@Bean
public Step processLightDataStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<LightRecord> reader,
ItemWriter<LightRecord> writer) {
return new StepBuilder("processLightDataStep", jobRepository)
// 軽量アイテム: コミット回数を減らすため大きめのチャンク
.<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Step processDocumentsStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<Document> reader,
ItemProcessor<Document, ProcessedDocument> processor,
ItemWriter<ProcessedDocument> writer) {
return new StepBuilder("processDocumentsStep", jobRepository)
// 重いドキュメント: メモリ抑制のため小さめのチャンク
.<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}チャンクあたり 100 件から始め、コミット時間、メモリ使用量、ロールバック時間といった指標を見て調整します。リスナーで監視し、最適点を見極めましょう。
並列処理のためのパーティショニング
質問 5: パーティショニングとは何で、いつ使いますか
パーティショニングはデータセットを独立したパーティションに分け、並列処理します。各パーティションは独自のスレッド(ローカル)またはリモート worker で動きます。このアプローチにより、リスタート性能を犠牲にせずスループットを倍増できます。
// パーティション化されたジョブの設定
@Configuration
public class PartitionedJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public PartitionedJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
@Bean
public Job partitionedImportJob(Step partitionedStep) {
return new JobBuilder("partitionedImportJob", jobRepository)
.start(partitionedStep)
.build();
}
// マネージャー Step: パーティションを統制
@Bean
public Step partitionedStep(Partitioner partitioner,
Step workerStep,
TaskExecutor taskExecutor) {
return new StepBuilder("partitionedStep", jobRepository)
// Partitioner で作業を分割
.partitioner("workerStep", partitioner)
// 各パーティションで実行する Step
.step(workerStep)
// 8 並列スレッド
.taskExecutor(taskExecutor)
// 作成するパーティション数
.gridSize(8)
.build();
}
// 並列実行用の TaskExecutor
@Bean
public TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("batch-partition-");
executor.initialize();
return executor;
}
}// ID 範囲ベースの Partitioner
@Component
public class RangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public RangePartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// データセットの境界を取得
Long minId = jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
Long maxId = jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);
if (minId == null || maxId == null) {
return Map.of(); // 処理対象データなし
}
// 各パーティションのサイズを計算
long range = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
long start = minId + (i * range);
long end = Math.min(start + range - 1, maxId);
// 各パーティションが境界値を受け取る
context.putLong("minId", start);
context.putLong("maxId", end);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}パーティショニングはアイテムが互いに独立した大規模データセットに適しています。遅いパーティションがジョブ全体を遅らせないよう、バランスを取ることが重要です。
質問 6: ローカルパーティショニングとリモートパーティショニングの違いは何ですか
ローカルパーティショニング はすべてのパーティションを同一 JVM 内のスレッドプールで実行します。リモートパーティショニング はメッセージングミドルウェアを介して複数の JVM(worker)にパーティションを分散します。
// メッセージングを用いたリモートパーティショニング設定
@Configuration
public class RemotePartitioningConfig {
@Bean
public Step managerStep(JobRepository jobRepository,
Partitioner partitioner,
MessageChannelPartitionHandler partitionHandler) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", partitioner)
// リモート worker と通信するハンドラ
.partitionHandler(partitionHandler)
.build();
}
// PartitionHandler は ExecutionContext を worker へ送信
@Bean
public MessageChannelPartitionHandler partitionHandler(
MessagingTemplate messagingTemplate,
JobExplorer jobExplorer) {
MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
handler.setStepName("workerStep");
handler.setGridSize(4);
handler.setMessagingOperations(messagingTemplate);
handler.setJobExplorer(jobExplorer);
// worker の完了待ちタイムアウト
handler.setPollInterval(5000L);
return handler;
}
}// worker 側の設定
@Configuration
public class WorkerConfiguration {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public WorkerConfiguration(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// worker はパーティションを受け取り Step を実行
@Bean
public Step workerStep(ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("workerStep", jobRepository)
.<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
// パーティションパラメーターを受け取るため @StepScope を設定
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
// パーティション境界を使う Reader
@Bean
@StepScope
public JdbcCursorItemReader<OrderRecord> partitionedReader(
DataSource dataSource,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
return new JdbcCursorItemReaderBuilder<OrderRecord>()
.name("partitionedOrderReader")
.dataSource(dataSource)
.sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
.preparedStatementSetter(ps -> {
ps.setLong(1, minId);
ps.setLong(2, maxId);
})
.rowMapper(new OrderRecordRowMapper())
.build();
}
}Spring Bootの面接対策はできていますか?
インタラクティブなシミュレーター、flashcards、技術テストで練習しましょう。
フォールトトレランスとエラー回復
質問 7: Spring Batch が提供するフォールトトレランス機構は何ですか
Spring Batch は補完しあう三つの機構を備えます。skip(失敗アイテムを無視)、retry(自動的にリトライ)、restart(失敗ジョブを再開)です。これらは Step レベルで設定します。
// 包括的なフォールトトレランス設定
@Configuration
public class FaultTolerantStepConfig {
@Bean
public Step faultTolerantStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<DataRecord> reader,
ItemProcessor<DataRecord, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
SkipPolicy customSkipPolicy) {
return new StepBuilder("faultTolerantStep", jobRepository)
.<DataRecord, ProcessedRecord>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// フォールトトレラントモードを有効化
.faultTolerant()
// SKIP: 検証エラーを最大 10 件まで無視
.skipLimit(10)
.skip(ValidationException.class)
.skip(DataIntegrityViolationException.class)
// 一部のエラーは決してスキップしない
.noSkip(FatalBatchException.class)
// RETRY: 一時的なエラーをリトライ
.retryLimit(3)
.retry(TransientDataAccessException.class)
.retry(DeadlockLoserDataAccessException.class)
// リトライ間の指数バックオフ
.backOffPolicy(exponentialBackOffPolicy())
// スキップを記録するリスナー
.listener(skipListener())
.build();
}
@Bean
public BackOffPolicy exponentialBackOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(1000); // 1 秒
policy.setMultiplier(2.0); // リトライごとに倍
policy.setMaxInterval(10000); // 最大 10 秒
return policy;
}
@Bean
public SkipListener<DataRecord, ProcessedRecord> skipListener() {
return new SkipListener<>() {
@Override
public void onSkipInRead(Throwable t) {
// 読み取り不能アイテムをログ
}
@Override
public void onSkipInProcess(DataRecord item, Throwable t) {
// 処理に失敗したアイテムをログ
}
@Override
public void onSkipInWrite(ProcessedRecord item, Throwable t) {
// 書き込みに失敗したアイテムをログ
}
};
}
}リトライは一時的なエラー(ネットワークタイムアウト、DB デッドロック)に有効です。スキップは個別のデータエラーで全体処理を止めない場合に向きます。
質問 8: カスタム SkipPolicy を実装するには
カスタム SkipPolicy により細かい判断が可能になります。例外型、エラー件数、業務固有の基準でスキップできます。
// 高度な業務ロジックを持つ SkipPolicy
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {
private static final int MAX_SKIP_COUNT = 100;
private static final double MAX_SKIP_PERCENTAGE = 0.05; // 最大 5%
private final AtomicInteger totalProcessed = new AtomicInteger(0);
private final AtomicInteger skipCount = new AtomicInteger(0);
@Override
public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
// 致命的エラーは決してスキップしない
if (exception instanceof FatalBatchException
|| exception instanceof OutOfMemoryError) {
return false;
}
// スキップ数の絶対上限
if (skipCountSoFar >= MAX_SKIP_COUNT) {
return false; // ジョブを停止
}
// パーセンテージ上限
int total = totalProcessed.get();
if (total > 1000) { // ウォームアップ後にのみ適用
double skipPercentage = (double) skipCountSoFar / total;
if (skipPercentage > MAX_SKIP_PERCENTAGE) {
return false; // 比率としてエラーが多すぎる
}
}
// 検証・データエラーをスキップ
return exception instanceof ValidationException
|| exception instanceof DataFormatException
|| exception instanceof IllegalArgumentException;
}
// リスナーから呼ばれ進捗を追跡
public void incrementProcessed() {
totalProcessed.incrementAndGet();
}
}質問 9: 失敗ジョブのリスタートはどのように動きますか
JobRepository は実行ごとの状態を保存します。リスタート時、Spring Batch は最後にコミットされたチャンクを特定し、そこから再開します。処理済みアイテムは再処理されません。
// ジョブリスタート管理サービス
@Service
public class JobRestartService {
private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
private final Job importJob;
public JobRestartService(JobLauncher jobLauncher,
JobExplorer jobExplorer,
JobRepository jobRepository,
@Qualifier("importJob") Job importJob) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
this.importJob = importJob;
}
public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
// 失敗した実行を取得
JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);
if (failedExecution == null) {
throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
}
// ジョブが再開可能か確認
if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
throw new IllegalStateException("Only FAILED jobs can be restarted");
}
// 元の実行と同じパラメーターを使用
JobParameters originalParams = failedExecution.getJobParameters();
// ジョブを再起動 - 最終チェックポイントから自動再開
return jobLauncher.run(importJob, originalParams);
}
public List<JobExecution> findRestartableJobs() {
// 再開未実施の FAILED 実行を一覧化
return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.FAILED)
.filter(this::isRestartable)
.toList();
}
private boolean isRestartable(JobExecution execution) {
// より新しい成功実行が存在しないことを確認
JobInstance instance = execution.getJobInstance();
return jobExplorer.getJobExecutions(instance).stream()
.noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
}
}ジョブは JobParameters が同一でなければ再開できません。パラメーターを変更すると新しいジョブインスタンスとなり、進捗履歴が失われます。
スケーリングと最適化
質問 10: 利用可能なスケーリング戦略は
Spring Batch は四つの戦略を提供します。multi-threaded step(複数スレッドで並列読み込み)、parallel steps(独立 Step の並列実行)、remote chunking(分散処理)、partitioning(データの分散)です。
// マルチスレッド Step: 複数スレッドが同じデータセットを処理
@Configuration
public class MultiThreadedStepConfig {
@Bean
public Step multiThreadedStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<Record> reader,
ItemProcessor<Record, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
TaskExecutor taskExecutor) {
return new StepBuilder("multiThreadedStep", jobRepository)
.<Record, ProcessedRecord>chunk(100, transactionManager)
// 注意: reader はスレッドセーフでなければならない
.reader(synchronizedReader(reader))
.processor(processor)
.writer(writer)
// 4 スレッドが並列にチャンクを処理
.taskExecutor(taskExecutor)
.throttleLimit(4)
.build();
}
// reader をスレッドセーフにするラッパー
private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
syncReader.setDelegate((ItemStreamReader<Record>) reader);
return syncReader;
}
}// 独立 Step を並列実行
@Configuration
public class ParallelStepsConfig {
@Bean
public Job parallelJob(JobRepository jobRepository,
Step loadCustomersStep,
Step loadProductsStep,
Step loadOrdersStep,
Step processDataStep) {
// 並列フロー: customers と products を同時にロード
Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
.start(loadCustomersStep)
.build();
Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
.start(loadProductsStep)
.build();
Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
.start(loadOrdersStep)
.build();
// Split がフローを並列実行
return new JobBuilder("parallelJob", jobRepository)
.start(new FlowBuilder<Flow>("parallelLoadFlow")
.split(new SimpleAsyncTaskExecutor())
.add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
.build())
// 並列ロード後に逐次処理
.next(processDataStep)
.build()
.build();
}
}マルチスレッド化は reader を同期できる場合に向きます。大量データではパーティショニングが望ましく、各パーティションが独自の reader を持ち競合しません。
質問 11: ジョブのパフォーマンスを監視するには
Spring Batch はリスナーと JobRepository を介してメトリクスを公開します。Micrometer 連携により Prometheus、Grafana その他の監視システムへエクスポート可能です。
// Micrometer による監視設定
@Configuration
public class BatchMetricsConfig {
private final MeterRegistry meterRegistry;
public BatchMetricsConfig(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Bean
public JobExecutionListener metricsJobListener() {
return new JobExecutionListener() {
private Timer.Sample jobTimer;
@Override
public void beforeJob(JobExecution jobExecution) {
// ジョブ時間タイマーを開始
jobTimer = Timer.start(meterRegistry);
Counter.builder("batch.job.started")
.tag("job", jobExecution.getJobInstance().getJobName())
.register(meterRegistry)
.increment();
}
@Override
public void afterJob(JobExecution jobExecution) {
// 合計時間を記録
jobTimer.stop(Timer.builder("batch.job.duration")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry));
// ステータス別ジョブカウンタ
Counter.builder("batch.job.completed")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry)
.increment();
}
};
}
@Bean
public StepExecutionListener metricsStepListener() {
return new StepExecutionListener() {
@Override
public void afterStep(StepExecution stepExecution) {
String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
String stepName = stepExecution.getStepName();
// スループットメトリクス
Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
return null;
}
};
}
}質問 12: パーティショニングでありがちな落とし穴は
頻出する誤りには次があります。アンバランスなパーティション(1 パーティションに 90% のデータが集中)、スレッドセーフでない reader、パーティション間の状態管理ミスです。
// 実際に負荷を均すパーティショナー
@Component
public class BalancedPartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// 処理対象アイテム総数を取得
Integer totalCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);
if (totalCount == null || totalCount == 0) {
return Map.of();
}
// パーティションごとの目標サイズを算出
int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);
Map<String, ExecutionContext> partitions = new HashMap<>();
// OFFSET/LIMIT でバランスの取れたパーティションを生成
// 範囲指定よりコストは高いが均衡を保証
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("offset", i * itemsPerPartition);
context.putInt("limit", itemsPerPartition);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}
// OffsetBasedReader.java
// オフセットベースのパーティショニングと互換のリーダー
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {
private final JdbcTemplate jdbcTemplate;
private Iterator<OrderRecord> iterator;
@Value("#{stepExecutionContext['offset']}")
private int offset;
@Value("#{stepExecutionContext['limit']}")
private int limit;
public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void open(ExecutionContext executionContext) {
// このパーティションに割り当てられた範囲だけをロード
List<OrderRecord> records = jdbcTemplate.query(
"SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
new OrderRecordRowMapper(),
limit, offset
);
this.iterator = records.iterator();
}
@Override
public OrderRecord read() {
return iterator.hasNext() ? iterator.next() : null;
}
@Override
public void update(ExecutionContext executionContext) {
// 必要ならリスタート用に状態を保存
}
@Override
public void close() {
// クリーンアップ
}
}シニア向けの応用質問
質問 13: ジョブ間の依存関係をどう管理しますか
Spring Batch はジョブ間依存をネイティブには管理しません。解決策として、外部オーケストレーター(Airflow、Kubernetes CronJob)や JobExplorer を使った独自実装があります。
// ジョブ間依存管理
@Service
public class JobDependencyService {
private final JobExplorer jobExplorer;
private final JobLauncher jobLauncher;
private final Map<String, Job> jobs;
public JobDependencyService(JobExplorer jobExplorer,
JobLauncher jobLauncher,
Map<String, Job> jobs) {
this.jobExplorer = jobExplorer;
this.jobLauncher = jobLauncher;
this.jobs = jobs;
}
public JobExecution runWithDependencies(String jobName,
JobParameters params,
List<String> dependsOn) throws Exception {
// すべての依存が成功しているか確認
for (String dependency : dependsOn) {
if (!hasSuccessfulExecution(dependency, params)) {
throw new JobExecutionException(
"Dependency not satisfied: " + dependency);
}
}
Job job = jobs.get(jobName);
if (job == null) {
throw new IllegalArgumentException("Unknown job: " + jobName);
}
return jobLauncher.run(job, params);
}
private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
// 同じ業務パラメーターで COMPLETED になった実行を探す
return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
.anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
}
private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
// 業務パラメーターを比較(実行タイムスタンプは無視)
String actualDate = actual.getString("businessDate");
String expectedDate = expected.getString("businessDate");
return Objects.equals(actualDate, expectedDate);
}
}質問 14: Spring Batch ジョブを効果的にテストするには
Spring Batch ジョブのテストは階層的に行います。コンポーネント単位のユニットテスト(reader, processor, writer)、Step の統合テスト、ジョブ全体のエンドツーエンドテストです。
// Processor のユニットテスト
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {
@Mock
private PricingService pricingService;
@Mock
private ValidationService validationService;
@InjectMocks
private OrderItemProcessor processor;
@Test
void shouldProcessValidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(true);
when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));
// When
ProcessedOrder result = processor.process(input);
// Then
assertThat(result).isNotNull();
assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
}
@Test
void shouldFilterInvalidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(false);
// When
ProcessedOrder result = processor.process(input);
// Then - null はフィルタを意味
assertThat(result).isNull();
verify(pricingService, never()).calculatePrice(any());
}
}// ジョブ全体の統合テスト
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JobRepositoryTestUtils jobRepositoryTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@BeforeEach
void setup() {
// テスト間でメタデータを掃除
jobRepositoryTestUtils.removeJobExecutions();
// テストデータを初期化
jdbcTemplate.execute("DELETE FROM processed_orders");
jdbcTemplate.execute("DELETE FROM orders");
}
@Test
void shouldCompleteJobSuccessfully() throws Exception {
// Given - テストデータ
insertTestOrders(100);
// When
JobParameters params = new JobParametersBuilder()
.addLocalDate("businessDate", LocalDate.now())
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncherTestUtils.launchJob(params);
// Then
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
assertThat(countProcessedOrders()).isEqualTo(100);
}
@Test
void shouldHandleEmptyDataset() throws Exception {
// Given - データなし
// When
JobExecution execution = jobLauncherTestUtils.launchJob();
// Then - データなしでもジョブは成功
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
@Test
void shouldRestartFromFailurePoint() throws Exception {
// Given - 処理中のエラーをシミュレート
insertTestOrders(100);
insertPoisonOrder(50); // エラー発生
// When - 1 回目の実行は失敗
JobExecution firstExecution = jobLauncherTestUtils.launchJob();
assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);
// データを修正
removePoisonOrder(50);
// When - リスタート
JobExecution restartExecution = jobLauncherTestUtils.launchJob(
firstExecution.getJobParameters());
// Then - 失敗地点から再開
assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
private void insertTestOrders(int count) {
for (int i = 1; i <= count; i++) {
jdbcTemplate.update(
"INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
i, i * 10, BigDecimal.valueOf(i * 10));
}
}
private int countProcessedOrders() {
return jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM processed_orders", Integer.class);
}
}質問 15: データベースの書き込み性能をどう最適化しますか
書き込みはしばしばボトルネックになります。最適化として、JDBC のバッチインサート、ロード中の制約無効化、ステージングテーブルの活用が挙げられます。
// 大量データ向けに最適化された Writer
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
private final DataSource dataSource;
public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
this.jdbcTemplate = jdbcTemplate;
this.dataSource = dataSource;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
List<? extends ProcessedOrder> items = chunk.getItems();
if (items.isEmpty()) {
return;
}
// バッチで PreparedStatement を使用
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
"VALUES (?, ?, ?, ?)")) {
for (ProcessedOrder order : items) {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
ps.addBatch();
}
// すべての INSERT を 1 回のネットワーク操作で実行
ps.executeBatch();
}
}
}
// StagingTableWriter.java
// 超大量データ向けのステージングテーブルパターン
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {
private final JdbcTemplate jdbcTemplate;
private String stagingTable;
public StagingTableWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void beforeStep(StepExecution stepExecution) {
// この Step 用に一時テーブルを作成
stagingTable = "staging_orders_" + stepExecution.getId();
jdbcTemplate.execute(
"CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// ステージングテーブルへ書き込み(FK 制約なし)
String sql = "INSERT INTO " + stagingTable +
" (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";
jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
});
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
// 最終テーブルへバルクコピー
jdbcTemplate.execute(
"INSERT INTO processed_orders SELECT * FROM " + stagingTable);
}
// ステージングテーブルを掃除
jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
return stepExecution.getExitStatus();
}
}まとめ
技術面接で Spring Batch 5 を制するには、内部メカニズムへの深い理解が必要です。
✅ アーキテクチャ: Job → Step → Chunk(Reader, Processor, Writer)
✅ チャンク処理: サイズ調整、ライフサイクル、トランザクション
✅ パーティショニング: ローカル vs リモート、パーティションのバランス
✅ フォールトトレランス: skip, retry, restart と適切なポリシー
✅ スケーリング: マルチスレッド、parallel steps、remote chunking
✅ テスト: ユニット、統合、エンドツーエンド
✅ 最適化: バッチ書き込み、ステージングテーブル、監視
応用質問では、データ量、時間制約、エラー許容度、利用可能なインフラといった文脈に基づくアーキテクチャ判断を説明する力が問われます。
今すぐ練習を始めましょう!
面接シミュレーターと技術テストで知識をテストしましょう。
タグ
共有
関連記事

Spring Modulith: モジュラーモノリスアーキテクチャ解説
Spring Modulith で Java のモジュラーモノリスを構築する方法を学びます。アーキテクチャ、モジュール、非同期イベント、Spring Boot 3 のコード例によるテスト。

Spring Boot 面接: トランザクション伝播の解説
Spring Boot のトランザクション伝播をマスターします。REQUIRED、REQUIRES_NEW、NESTED など。コード例と典型的な落とし穴を含む 12 の面接質問。

Spring Security 6: JWT認証の完全ガイド
Spring Security 6でJWT認証を実装するための実践ガイド。設定、トークン生成、検証、セキュリティのベストプラクティスを解説します。