Wawancara Spring Batch 5: Partisi, Chunk, dan Toleransi Kegagalan
Kuasai wawancara Spring Batch 5: 15 pertanyaan penting tentang partisi, pemrosesan chunk, dan toleransi kegagalan dengan contoh kode Java 21.

Spring Batch 5 menjadi tulang punggung pemrosesan data berskala besar di ekosistem Spring. Wawancara teknis menilai kemampuan merancang job yang andal, scalable, dan toleran terhadap kegagalan. Penguasaan partisi, pemrosesan berbasis chunk, dan mekanisme toleransi kegagalan membedakan developer senior.
Rekruter menguji pemahaman mendalam: mengapa memilih partitioning daripada remote chunking? Bagaimana menentukan ukuran chunk yang tepat? Keputusan arsitektur ini menunjukkan pengalaman produksi yang sebenarnya.
Arsitektur inti Spring Batch 5
Pertanyaan 1: Apa komponen utama Spring Batch?
Arsitektur Spring Batch terdiri atas tiga lapisan: aplikasi (job dan kode bisnis), Batch Core (kelas runtime untuk meluncurkan dan mengontrol job), dan infrastruktur (reader, writer, dan layanan umum seperti RetryTemplate).
// Konfigurasi job Spring Batch 5 dengan Java 21
@Configuration
public class BatchJobConfig {
// JobRepository menyimpan metadata eksekusi
// Memungkinkan restart dan monitoring job
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public BatchJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Job mengenkapsulasi proses batch lengkap
// Terdiri dari satu atau lebih Step yang dieksekusi berurutan
@Bean
public Job importUserJob(Step processUsersStep, Step cleanupStep) {
return new JobBuilder("importUserJob", jobRepository)
.start(processUsersStep) // Step pemrosesan utama
.next(cleanupStep) // Step pembersihan
.build();
}
// Step merepresentasikan unit kerja independen
// Dua model: Tasklet (tugas tunggal) atau Chunk (pemrosesan iteratif)
@Bean
public Step processUsersStep(ItemReader<UserRecord> reader,
ItemProcessor<UserRecord, User> processor,
ItemWriter<User> writer) {
return new StepBuilder("processUsersStep", jobRepository)
.<UserRecord, User>chunk(100, transactionManager) // Commit setiap 100 item
.reader(reader) // Membaca data sumber
.processor(processor) // Mentransformasi setiap item
.writer(writer) // Menulis dalam batch 100
.build();
}
}JobRepository mempersistensi status eksekusi ke database. Persistensi ini memungkinkan job yang gagal dilanjutkan tepat di titik berhenti, tanpa memproses ulang data yang sudah dikomit.
Pertanyaan 2: Apa perbedaan antara Tasklet dan pemrosesan berbasis Chunk?
Tasklet menjalankan aksi tunggal yang tidak iteratif: penghapusan file, panggilan stored procedure, pengiriman email notifikasi. Chunk memproses volume besar dengan membagi data menjadi batch yang dapat dikelola.
// Tasklet: aksi tunggal tanpa iterasi
@Component
public class CleanupTasklet implements Tasklet {
private final Path tempDirectory = Path.of("/tmp/batch-work");
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// Menghapus semua file sementara dari pemrosesan
try (var files = Files.walk(tempDirectory)) {
files.filter(Files::isRegularFile)
.forEach(this::deleteQuietly);
}
// FINISHED menandakan tasklet selesai bekerja
// CONTINUABLE akan mengulang eksekusi (berguna untuk polling)
return RepeatStatus.FINISHED;
}
private void deleteQuietly(Path file) {
try {
Files.delete(file);
} catch (IOException e) {
// Catat dan lanjutkan - jangan gagalkan job karena satu file
}
}
}// Pemrosesan chunk: penanganan volume tinggi
@Configuration
public class ChunkProcessingConfig {
@Bean
public Step processOrdersStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("processOrdersStep", jobRepository)
// Chunk 500: baca 500 item, proses, tulis, lalu commit
.<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Listener untuk memantau progres
.listener(new ChunkProgressListener())
.build();
}
}Pemrosesan berbasis chunk memberi keuntungan kritis: manajemen memori yang dioptimalkan (hanya chunk saat ini di memori), transaksi granular (commit per chunk), dan pemulihan kegagalan pada chunk terakhir yang dikomit.
Pendalaman pemrosesan berbasis Chunk
Pertanyaan 3: Bagaimana siklus hidup chunk bekerja?
Setiap chunk mengikuti siklus yang presisi: membaca item satu per satu sampai mencapai ukuran yang dikonfigurasi, memproses tiap item secara individual, lalu menulis grup. Sebuah transaksi membungkus seluruh chunk.
// ItemReader: membaca satu item per panggilan
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {
// @StepScope: instance baru per eksekusi step
// Memungkinkan injeksi parameter job dinamis
@Value("#{jobParameters['startDate']}")
private LocalDate startDate;
private Iterator<OrderRecord> orderIterator;
@BeforeStep
public void initializeReader(StepExecution stepExecution) {
// Memuat data saat step dimulai
List<OrderRecord> orders = fetchOrdersFromDate(startDate);
this.orderIterator = orders.iterator();
}
@Override
public OrderRecord read() {
// Mengembalikan null untuk menandai akhir data
// Spring Batch memanggil read() sampai menerima null
if (orderIterator.hasNext()) {
return orderIterator.next();
}
return null; // Akhir dataset
}
private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
// Mengambil dari sumber data
return List.of(); // Implementasi sebenarnya
}
}// ItemProcessor: mentransformasi tiap item secara individual
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {
private final PricingService pricingService;
private final ValidationService validationService;
public OrderItemProcessor(PricingService pricingService,
ValidationService validationService) {
this.pricingService = pricingService;
this.validationService = validationService;
}
@Override
public ProcessedOrder process(OrderRecord item) {
// Mengembalikan null memfilter item (tidak akan ditulis)
if (!validationService.isValid(item)) {
return null; // Item difilter
}
// Transformasi bisnis
BigDecimal finalPrice = pricingService.calculatePrice(item);
return new ProcessedOrder(
item.orderId(),
item.customerId(),
finalPrice,
LocalDateTime.now()
);
}
}// ItemWriter: menulis seluruh chunk dalam satu operasi
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
public OrderItemWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// Chunk berisi semua item yang telah diproses
// Penulisan batch untuk performa optimal
List<? extends ProcessedOrder> items = chunk.getItems();
jdbcTemplate.batchUpdate(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
items,
items.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
}
);
}
}Jika terjadi exception saat pemrosesan chunk, transaksi di-rollback. Job kemudian dapat dilanjutkan dari chunk tersebut menggunakan metadata yang tersimpan di JobRepository.
Pertanyaan 4: Bagaimana memilih ukuran chunk yang optimal?
Ukuran chunk berdampak langsung pada performa dan konsumsi memori. Chunk yang terlalu kecil memperbanyak commit (overhead). Chunk yang terlalu besar memakan memori berlebih dan memperpanjang rollback saat gagal.
// Konfigurasi dinamis ukuran chunk
@Configuration
public class ChunkSizingConfig {
// Default yang masuk akal untuk sebagian besar kasus
private static final int DEFAULT_CHUNK_SIZE = 100;
// Untuk item ringan (sedikit field)
private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;
// Untuk item berat (blob, dokumen)
private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;
@Bean
public Step processLightDataStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<LightRecord> reader,
ItemWriter<LightRecord> writer) {
return new StepBuilder("processLightDataStep", jobRepository)
// Item ringan: chunk lebih besar untuk lebih sedikit commit
.<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Step processDocumentsStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<Document> reader,
ItemProcessor<Document, ProcessedDocument> processor,
ItemWriter<ProcessedDocument> writer) {
return new StepBuilder("processDocumentsStep", jobRepository)
// Dokumen berat: chunk lebih kecil untuk membatasi memori
.<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}Mulai dengan 100 item per chunk, lalu sesuaikan berdasarkan metrik: waktu commit, penggunaan memori, dan durasi rollback. Gunakan listener untuk memantau dan menemukan titik optimal.
Partitioning untuk pemrosesan paralel
Pertanyaan 5: Apa itu partitioning dan kapan menggunakannya?
Partitioning membagi dataset menjadi partisi independen yang diproses paralel. Setiap partisi berjalan di thread sendiri (lokal) atau pada worker remote. Pendekatan ini melipatgandakan throughput tanpa mengorbankan kemampuan restart.
// Konfigurasi job berpartisi
@Configuration
public class PartitionedJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public PartitionedJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
@Bean
public Job partitionedImportJob(Step partitionedStep) {
return new JobBuilder("partitionedImportJob", jobRepository)
.start(partitionedStep)
.build();
}
// Manager step: mengorkestrasi partisi
@Bean
public Step partitionedStep(Partitioner partitioner,
Step workerStep,
TaskExecutor taskExecutor) {
return new StepBuilder("partitionedStep", jobRepository)
// Membagi pekerjaan melalui Partitioner
.partitioner("workerStep", partitioner)
// Step yang dieksekusi untuk setiap partisi
.step(workerStep)
// 8 thread paralel
.taskExecutor(taskExecutor)
// Jumlah partisi yang dibuat
.gridSize(8)
.build();
}
// TaskExecutor untuk eksekusi paralel
@Bean
public TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("batch-partition-");
executor.initialize();
return executor;
}
}// Partitioner berbasis rentang ID
@Component
public class RangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public RangePartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Mengambil batas dataset
Long minId = jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
Long maxId = jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);
if (minId == null || maxId == null) {
return Map.of(); // Tidak ada data untuk diproses
}
// Menghitung ukuran tiap partisi
long range = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
long start = minId + (i * range);
long end = Math.min(start + range - 1, maxId);
// Tiap partisi menerima batasnya
context.putLong("minId", start);
context.putLong("maxId", end);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}Partitioning cocok untuk dataset besar dengan item independen. Partisi harus seimbang agar partisi lambat tidak memperlambat seluruh job.
Pertanyaan 6: Apa perbedaan partitioning lokal dan remote?
Partitioning lokal menjalankan semua partisi di JVM yang sama dengan thread pool. Partitioning remote mendistribusikan partisi ke beberapa JVM (worker) melalui middleware messaging.
// Konfigurasi partitioning remote dengan messaging
@Configuration
public class RemotePartitioningConfig {
@Bean
public Step managerStep(JobRepository jobRepository,
Partitioner partitioner,
MessageChannelPartitionHandler partitionHandler) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", partitioner)
// Handler yang berkomunikasi dengan worker remote
.partitionHandler(partitionHandler)
.build();
}
// PartitionHandler mengirim ExecutionContext ke worker
@Bean
public MessageChannelPartitionHandler partitionHandler(
MessagingTemplate messagingTemplate,
JobExplorer jobExplorer) {
MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
handler.setStepName("workerStep");
handler.setGridSize(4);
handler.setMessagingOperations(messagingTemplate);
handler.setJobExplorer(jobExplorer);
// Timeout menunggu worker selesai
handler.setPollInterval(5000L);
return handler;
}
}// Konfigurasi sisi worker
@Configuration
public class WorkerConfiguration {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public WorkerConfiguration(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Worker menerima partisi dan menjalankan step
@Bean
public Step workerStep(ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("workerStep", jobRepository)
.<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
// Reader dengan @StepScope untuk menerima parameter partisi
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
// Reader yang memakai batas partisi
@Bean
@StepScope
public JdbcCursorItemReader<OrderRecord> partitionedReader(
DataSource dataSource,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
return new JdbcCursorItemReaderBuilder<OrderRecord>()
.name("partitionedOrderReader")
.dataSource(dataSource)
.sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
.preparedStatementSetter(ps -> {
ps.setLong(1, minId);
ps.setLong(2, maxId);
})
.rowMapper(new OrderRecordRowMapper())
.build();
}
}Siap menguasai wawancara Spring Boot Anda?
Berlatih dengan simulator interaktif, flashcards, dan tes teknis kami.
Toleransi kegagalan dan pemulihan kesalahan
Pertanyaan 7: Mekanisme toleransi kegagalan apa yang ditawarkan Spring Batch?
Spring Batch menawarkan tiga mekanisme komplementer: skip (mengabaikan item yang gagal), retry (mencoba ulang otomatis), dan restart (melanjutkan job yang gagal). Mekanisme ini dikonfigurasi di tingkat step.
// Konfigurasi toleransi kegagalan lengkap
@Configuration
public class FaultTolerantStepConfig {
@Bean
public Step faultTolerantStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<DataRecord> reader,
ItemProcessor<DataRecord, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
SkipPolicy customSkipPolicy) {
return new StepBuilder("faultTolerantStep", jobRepository)
.<DataRecord, ProcessedRecord>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Mengaktifkan mode toleran terhadap kegagalan
.faultTolerant()
// SKIP: mengabaikan hingga 10 error validasi
.skipLimit(10)
.skip(ValidationException.class)
.skip(DataIntegrityViolationException.class)
// Beberapa error tidak boleh dilewati
.noSkip(FatalBatchException.class)
// RETRY: mencoba ulang error transien
.retryLimit(3)
.retry(TransientDataAccessException.class)
.retry(DeadlockLoserDataAccessException.class)
// Backoff eksponensial antar retry
.backOffPolicy(exponentialBackOffPolicy())
// Listener untuk mencatat skip
.listener(skipListener())
.build();
}
@Bean
public BackOffPolicy exponentialBackOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(1000); // 1 detik
policy.setMultiplier(2.0); // Berlipat setiap retry
policy.setMaxInterval(10000); // Maksimum 10 detik
return policy;
}
@Bean
public SkipListener<DataRecord, ProcessedRecord> skipListener() {
return new SkipListener<>() {
@Override
public void onSkipInRead(Throwable t) {
// Catat item yang tidak terbaca
}
@Override
public void onSkipInProcess(DataRecord item, Throwable t) {
// Catat item yang gagal saat diproses
}
@Override
public void onSkipInWrite(ProcessedRecord item, Throwable t) {
// Catat item yang gagal saat ditulis
}
};
}
}Retry cocok untuk error transien (timeout jaringan, deadlock database). Skip cocok untuk error data individual yang tidak seharusnya memblokir pemrosesan keseluruhan.
Pertanyaan 8: Bagaimana mengimplementasikan SkipPolicy kustom?
SkipPolicy kustom memungkinkan logika keputusan halus: skip berdasarkan tipe exception, jumlah error, atau kriteria bisnis spesifik.
// SkipPolicy dengan logika bisnis lanjutan
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {
private static final int MAX_SKIP_COUNT = 100;
private static final double MAX_SKIP_PERCENTAGE = 0.05; // 5% maksimum
private final AtomicInteger totalProcessed = new AtomicInteger(0);
private final AtomicInteger skipCount = new AtomicInteger(0);
@Override
public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
// Jangan pernah melewatkan error fatal
if (exception instanceof FatalBatchException
|| exception instanceof OutOfMemoryError) {
return false;
}
// Batas absolut skip
if (skipCountSoFar >= MAX_SKIP_COUNT) {
return false; // Hentikan job
}
// Batas persentase
int total = totalProcessed.get();
if (total > 1000) { // Terapkan hanya setelah warmup
double skipPercentage = (double) skipCountSoFar / total;
if (skipPercentage > MAX_SKIP_PERCENTAGE) {
return false; // Terlalu banyak error secara proporsional
}
}
// Lewati error validasi dan data
return exception instanceof ValidationException
|| exception instanceof DataFormatException
|| exception instanceof IllegalArgumentException;
}
// Dipanggil oleh listener untuk memantau progres
public void incrementProcessed() {
totalProcessed.incrementAndGet();
}
}Pertanyaan 9: Bagaimana restart job yang gagal bekerja?
JobRepository menyimpan status setiap eksekusi. Saat restart, Spring Batch mengidentifikasi chunk terakhir yang dikomit dan melanjutkan dari titik tersebut. Item yang sudah berhasil diproses tidak diproses ulang.
// Layanan manajemen restart job
@Service
public class JobRestartService {
private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
private final Job importJob;
public JobRestartService(JobLauncher jobLauncher,
JobExplorer jobExplorer,
JobRepository jobRepository,
@Qualifier("importJob") Job importJob) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
this.importJob = importJob;
}
public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
// Mengambil eksekusi yang gagal
JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);
if (failedExecution == null) {
throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
}
// Memverifikasi job dapat di-restart
if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
throw new IllegalStateException("Only FAILED jobs can be restarted");
}
// Menggunakan parameter yang sama dengan eksekusi awal
JobParameters originalParams = failedExecution.getJobParameters();
// Meluncurkan ulang job - otomatis melanjutkan dari checkpoint terakhir
return jobLauncher.run(importJob, originalParams);
}
public List<JobExecution> findRestartableJobs() {
// Daftar semua eksekusi FAILED yang belum di-restart
return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.FAILED)
.filter(this::isRestartable)
.toList();
}
private boolean isRestartable(JobExecution execution) {
// Memverifikasi tidak ada eksekusi sukses yang lebih baru
JobInstance instance = execution.getJobInstance();
return jobExplorer.getJobExecutions(instance).stream()
.noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
}
}Job hanya dapat di-restart jika JobParameters identik. Mengubah parameter membuat instance job baru dan menghilangkan riwayat progres.
Penskalaan dan optimisasi
Pertanyaan 10: Strategi penskalaan apa yang tersedia?
Spring Batch menawarkan empat strategi: multi-threaded step (banyak thread membaca paralel), parallel steps (step independen paralel), remote chunking (pemrosesan terdistribusi), dan partitioning (data terdistribusi).
// Step multi-thread: banyak thread memproses dataset yang sama
@Configuration
public class MultiThreadedStepConfig {
@Bean
public Step multiThreadedStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<Record> reader,
ItemProcessor<Record, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
TaskExecutor taskExecutor) {
return new StepBuilder("multiThreadedStep", jobRepository)
.<Record, ProcessedRecord>chunk(100, transactionManager)
// PERHATIAN: reader harus thread-safe
.reader(synchronizedReader(reader))
.processor(processor)
.writer(writer)
// 4 thread memproses chunk paralel
.taskExecutor(taskExecutor)
.throttleLimit(4)
.build();
}
// Wrapper untuk membuat reader thread-safe
private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
syncReader.setDelegate((ItemStreamReader<Record>) reader);
return syncReader;
}
}// Eksekusi step independen secara paralel
@Configuration
public class ParallelStepsConfig {
@Bean
public Job parallelJob(JobRepository jobRepository,
Step loadCustomersStep,
Step loadProductsStep,
Step loadOrdersStep,
Step processDataStep) {
// Flow paralel: customers dan products dimuat bersamaan
Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
.start(loadCustomersStep)
.build();
Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
.start(loadProductsStep)
.build();
Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
.start(loadOrdersStep)
.build();
// Split menjalankan flow secara paralel
return new JobBuilder("parallelJob", jobRepository)
.start(new FlowBuilder<Flow>("parallelLoadFlow")
.split(new SimpleAsyncTaskExecutor())
.add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
.build())
// Setelah load paralel, pemrosesan berurutan
.next(processDataStep)
.build()
.build();
}
}Multi-threading cocok jika reader bisa disinkronkan. Partitioning lebih disukai untuk volume besar karena setiap partisi punya reader sendiri tanpa kontensi.
Pertanyaan 11: Bagaimana memantau performa job?
Spring Batch mengekspos metrik melalui listener dan JobRepository. Integrasi dengan Micrometer memungkinkan ekspor ke Prometheus, Grafana, atau sistem monitoring lain.
// Konfigurasi monitoring dengan Micrometer
@Configuration
public class BatchMetricsConfig {
private final MeterRegistry meterRegistry;
public BatchMetricsConfig(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Bean
public JobExecutionListener metricsJobListener() {
return new JobExecutionListener() {
private Timer.Sample jobTimer;
@Override
public void beforeJob(JobExecution jobExecution) {
// Memulai timer durasi job
jobTimer = Timer.start(meterRegistry);
Counter.builder("batch.job.started")
.tag("job", jobExecution.getJobInstance().getJobName())
.register(meterRegistry)
.increment();
}
@Override
public void afterJob(JobExecution jobExecution) {
// Mencatat durasi total
jobTimer.stop(Timer.builder("batch.job.duration")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry));
// Counter job per status
Counter.builder("batch.job.completed")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry)
.increment();
}
};
}
@Bean
public StepExecutionListener metricsStepListener() {
return new StepExecutionListener() {
@Override
public void afterStep(StepExecution stepExecution) {
String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
String stepName = stepExecution.getStepName();
// Metrik throughput
Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
return null;
}
};
}
}Pertanyaan 12: Apa jebakan umum saat partitioning?
Kesalahan yang sering muncul: partisi tidak seimbang (satu partisi memuat 90% data), reader yang tidak thread-safe, dan manajemen state yang salah antar partisi.
// Partitioner yang benar-benar menyeimbangkan beban
@Component
public class BalancedPartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Menghitung total item yang akan diproses
Integer totalCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);
if (totalCount == null || totalCount == 0) {
return Map.of();
}
// Menghitung target ukuran per partisi
int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);
Map<String, ExecutionContext> partitions = new HashMap<>();
// Memakai OFFSET/LIMIT untuk partisi seimbang
// Lebih mahal daripada range tetapi menjamin keseimbangan
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("offset", i * itemsPerPartition);
context.putInt("limit", itemsPerPartition);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}
// OffsetBasedReader.java
// Reader kompatibel dengan partitioning berbasis offset
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {
private final JdbcTemplate jdbcTemplate;
private Iterator<OrderRecord> iterator;
@Value("#{stepExecutionContext['offset']}")
private int offset;
@Value("#{stepExecutionContext['limit']}")
private int limit;
public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void open(ExecutionContext executionContext) {
// Memuat persis bagian yang ditugaskan ke partisi ini
List<OrderRecord> records = jdbcTemplate.query(
"SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
new OrderRecordRowMapper(),
limit, offset
);
this.iterator = records.iterator();
}
@Override
public OrderRecord read() {
return iterator.hasNext() ? iterator.next() : null;
}
@Override
public void update(ExecutionContext executionContext) {
// Penyimpanan state untuk restart bila perlu
}
@Override
public void close() {
// Pembersihan
}
}Pertanyaan lanjutan untuk senior
Pertanyaan 13: Bagaimana mengelola dependensi antar job?
Spring Batch tidak mengelola dependensi antar job secara native. Solusinya: orkestrator eksternal (Airflow, Kubernetes CronJob) atau implementasi kustom dengan JobExplorer.
// Pengelolaan dependensi antar job
@Service
public class JobDependencyService {
private final JobExplorer jobExplorer;
private final JobLauncher jobLauncher;
private final Map<String, Job> jobs;
public JobDependencyService(JobExplorer jobExplorer,
JobLauncher jobLauncher,
Map<String, Job> jobs) {
this.jobExplorer = jobExplorer;
this.jobLauncher = jobLauncher;
this.jobs = jobs;
}
public JobExecution runWithDependencies(String jobName,
JobParameters params,
List<String> dependsOn) throws Exception {
// Memverifikasi semua dependensi berhasil
for (String dependency : dependsOn) {
if (!hasSuccessfulExecution(dependency, params)) {
throw new JobExecutionException(
"Dependency not satisfied: " + dependency);
}
}
Job job = jobs.get(jobName);
if (job == null) {
throw new IllegalArgumentException("Unknown job: " + jobName);
}
return jobLauncher.run(job, params);
}
private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
// Mencari eksekusi COMPLETED dengan parameter bisnis yang sama
return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
.anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
}
private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
// Membandingkan parameter bisnis (mengabaikan timestamp eksekusi)
String actualDate = actual.getString("businessDate");
String expectedDate = expected.getString("businessDate");
return Objects.equals(actualDate, expectedDate);
}
}Pertanyaan 14: Bagaimana menguji job Spring Batch secara efektif?
Menguji job Spring Batch membutuhkan pendekatan berlapis: unit test untuk komponen (reader, processor, writer), integration test untuk step, dan end-to-end test untuk job lengkap.
// Unit test processor
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {
@Mock
private PricingService pricingService;
@Mock
private ValidationService validationService;
@InjectMocks
private OrderItemProcessor processor;
@Test
void shouldProcessValidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(true);
when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));
// When
ProcessedOrder result = processor.process(input);
// Then
assertThat(result).isNotNull();
assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
}
@Test
void shouldFilterInvalidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(false);
// When
ProcessedOrder result = processor.process(input);
// Then - null berarti difilter
assertThat(result).isNull();
verify(pricingService, never()).calculatePrice(any());
}
}// Integration test untuk job lengkap
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JobRepositoryTestUtils jobRepositoryTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@BeforeEach
void setup() {
// Membersihkan metadata antar test
jobRepositoryTestUtils.removeJobExecutions();
// Mereset data uji
jdbcTemplate.execute("DELETE FROM processed_orders");
jdbcTemplate.execute("DELETE FROM orders");
}
@Test
void shouldCompleteJobSuccessfully() throws Exception {
// Given - data uji
insertTestOrders(100);
// When
JobParameters params = new JobParametersBuilder()
.addLocalDate("businessDate", LocalDate.now())
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncherTestUtils.launchJob(params);
// Then
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
assertThat(countProcessedOrders()).isEqualTo(100);
}
@Test
void shouldHandleEmptyDataset() throws Exception {
// Given - tanpa data
// When
JobExecution execution = jobLauncherTestUtils.launchJob();
// Then - job berhasil meski tanpa data
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
@Test
void shouldRestartFromFailurePoint() throws Exception {
// Given - mensimulasikan error di tengah pemrosesan
insertTestOrders(100);
insertPoisonOrder(50); // Memicu error
// When - eksekusi pertama gagal
JobExecution firstExecution = jobLauncherTestUtils.launchJob();
assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);
// Memperbaiki data
removePoisonOrder(50);
// When - restart
JobExecution restartExecution = jobLauncherTestUtils.launchJob(
firstExecution.getJobParameters());
// Then - melanjutkan dari titik kegagalan
assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
private void insertTestOrders(int count) {
for (int i = 1; i <= count; i++) {
jdbcTemplate.update(
"INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
i, i * 10, BigDecimal.valueOf(i * 10));
}
}
private int countProcessedOrders() {
return jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM processed_orders", Integer.class);
}
}Pertanyaan 15: Bagaimana mengoptimalkan performa penulisan ke database?
Penulisan sering menjadi bottleneck. Optimasi meliputi: JDBC batch insert, menonaktifkan constraint saat loading, dan penggunaan staging table.
// Writer dioptimasi untuk volume besar
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
private final DataSource dataSource;
public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
this.jdbcTemplate = jdbcTemplate;
this.dataSource = dataSource;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
List<? extends ProcessedOrder> items = chunk.getItems();
if (items.isEmpty()) {
return;
}
// Memakai PreparedStatement dengan batch
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
"VALUES (?, ?, ?, ?)")) {
for (ProcessedOrder order : items) {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
ps.addBatch();
}
// Mengeksekusi semua insert dalam satu operasi jaringan
ps.executeBatch();
}
}
}
// StagingTableWriter.java
// Pola staging table untuk volume sangat besar
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {
private final JdbcTemplate jdbcTemplate;
private String stagingTable;
public StagingTableWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void beforeStep(StepExecution stepExecution) {
// Membuat tabel sementara untuk step ini
stagingTable = "staging_orders_" + stepExecution.getId();
jdbcTemplate.execute(
"CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// Menulis ke staging table (tanpa constraint FK)
String sql = "INSERT INTO " + stagingTable +
" (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";
jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
});
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
// Bulk copy ke tabel akhir
jdbcTemplate.execute(
"INSERT INTO processed_orders SELECT * FROM " + stagingTable);
}
// Membersihkan staging table
jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
return stepExecution.getExitStatus();
}
}Kesimpulan
Menguasai Spring Batch 5 di wawancara teknis bersandar pada pemahaman mendalam tentang mekanisme internal:
✅ Arsitektur: Job → Step → Chunk (Reader, Processor, Writer)
✅ Pemrosesan chunk: penentuan ukuran, siklus hidup, transaksi
✅ Partitioning: lokal vs remote, penyeimbangan partisi
✅ Toleransi kegagalan: skip, retry, restart dengan kebijakan tepat
✅ Penskalaan: multi-threading, parallel steps, remote chunking
✅ Pengujian: unit, integrasi, end-to-end
✅ Optimasi: batch writes, staging table, monitoring
Pertanyaan lanjutan menguji kemampuan membenarkan keputusan arsitektur sesuai konteks: volume data, batasan waktu, toleransi kesalahan, dan infrastruktur yang tersedia.
Mulai berlatih!
Uji pengetahuan Anda dengan simulator wawancara dan tes teknis kami.
Tag
Bagikan
Artikel terkait

Spring Modulith: Arsitektur Monolit Modular Dijelaskan
Pelajari Spring Modulith untuk membangun monolit modular di Java. Arsitektur, modul, event asinkron, dan testing dengan contoh Spring Boot 3.

Wawancara Spring Boot: Propagasi Transaksi Dijelaskan
Kuasai propagasi transaksi Spring Boot: REQUIRED, REQUIRES_NEW, NESTED dan lainnya. 12 pertanyaan wawancara dengan contoh kode dan jebakan umum.

Spring Security 6: Autentikasi JWT Lengkap
Panduan praktis untuk menerapkan autentikasi JWT dengan Spring Security 6: konfigurasi, pembuatan token, validasi, dan praktik keamanan terbaik.