Wawancara Spring Batch 5: Partisi, Chunk, dan Toleransi Kegagalan

Kuasai wawancara Spring Batch 5: 15 pertanyaan penting tentang partisi, pemrosesan chunk, dan toleransi kegagalan dengan contoh kode Java 21.

Wawancara Spring Batch 5: partisi, chunk, dan toleransi kegagalan

Spring Batch 5 menjadi tulang punggung pemrosesan data berskala besar di ekosistem Spring. Wawancara teknis menilai kemampuan merancang job yang andal, scalable, dan toleran terhadap kegagalan. Penguasaan partisi, pemrosesan berbasis chunk, dan mekanisme toleransi kegagalan membedakan developer senior.

Fokus wawancara

Rekruter menguji pemahaman mendalam: mengapa memilih partitioning daripada remote chunking? Bagaimana menentukan ukuran chunk yang tepat? Keputusan arsitektur ini menunjukkan pengalaman produksi yang sebenarnya.

Arsitektur inti Spring Batch 5

Pertanyaan 1: Apa komponen utama Spring Batch?

Arsitektur Spring Batch terdiri atas tiga lapisan: aplikasi (job dan kode bisnis), Batch Core (kelas runtime untuk meluncurkan dan mengontrol job), dan infrastruktur (reader, writer, dan layanan umum seperti RetryTemplate).

BatchJobConfig.javajava
// Konfigurasi job Spring Batch 5 dengan Java 21
@Configuration
public class BatchJobConfig {

    // JobRepository menyimpan metadata eksekusi
    // Memungkinkan restart dan monitoring job
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public BatchJobConfig(JobRepository jobRepository,
                          PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    // Job mengenkapsulasi proses batch lengkap
    // Terdiri dari satu atau lebih Step yang dieksekusi berurutan
    @Bean
    public Job importUserJob(Step processUsersStep, Step cleanupStep) {
        return new JobBuilder("importUserJob", jobRepository)
                .start(processUsersStep)      // Step pemrosesan utama
                .next(cleanupStep)             // Step pembersihan
                .build();
    }

    // Step merepresentasikan unit kerja independen
    // Dua model: Tasklet (tugas tunggal) atau Chunk (pemrosesan iteratif)
    @Bean
    public Step processUsersStep(ItemReader<UserRecord> reader,
                                  ItemProcessor<UserRecord, User> processor,
                                  ItemWriter<User> writer) {
        return new StepBuilder("processUsersStep", jobRepository)
                .<UserRecord, User>chunk(100, transactionManager)  // Commit setiap 100 item
                .reader(reader)       // Membaca data sumber
                .processor(processor) // Mentransformasi setiap item
                .writer(writer)       // Menulis dalam batch 100
                .build();
    }
}

JobRepository mempersistensi status eksekusi ke database. Persistensi ini memungkinkan job yang gagal dilanjutkan tepat di titik berhenti, tanpa memproses ulang data yang sudah dikomit.

Pertanyaan 2: Apa perbedaan antara Tasklet dan pemrosesan berbasis Chunk?

Tasklet menjalankan aksi tunggal yang tidak iteratif: penghapusan file, panggilan stored procedure, pengiriman email notifikasi. Chunk memproses volume besar dengan membagi data menjadi batch yang dapat dikelola.

CleanupTasklet.javajava
// Tasklet: aksi tunggal tanpa iterasi
@Component
public class CleanupTasklet implements Tasklet {

    private final Path tempDirectory = Path.of("/tmp/batch-work");

    @Override
    public RepeatStatus execute(StepContribution contribution,
                                 ChunkContext chunkContext) throws Exception {
        // Menghapus semua file sementara dari pemrosesan
        try (var files = Files.walk(tempDirectory)) {
            files.filter(Files::isRegularFile)
                 .forEach(this::deleteQuietly);
        }

        // FINISHED menandakan tasklet selesai bekerja
        // CONTINUABLE akan mengulang eksekusi (berguna untuk polling)
        return RepeatStatus.FINISHED;
    }

    private void deleteQuietly(Path file) {
        try {
            Files.delete(file);
        } catch (IOException e) {
            // Catat dan lanjutkan - jangan gagalkan job karena satu file
        }
    }
}
ChunkProcessingConfig.javajava
// Pemrosesan chunk: penanganan volume tinggi
@Configuration
public class ChunkProcessingConfig {

    @Bean
    public Step processOrdersStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<OrderRecord> reader,
                                   ItemProcessor<OrderRecord, ProcessedOrder> processor,
                                   ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("processOrdersStep", jobRepository)
                // Chunk 500: baca 500 item, proses, tulis, lalu commit
                .<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                // Listener untuk memantau progres
                .listener(new ChunkProgressListener())
                .build();
    }
}

Pemrosesan berbasis chunk memberi keuntungan kritis: manajemen memori yang dioptimalkan (hanya chunk saat ini di memori), transaksi granular (commit per chunk), dan pemulihan kegagalan pada chunk terakhir yang dikomit.

Pendalaman pemrosesan berbasis Chunk

Pertanyaan 3: Bagaimana siklus hidup chunk bekerja?

Setiap chunk mengikuti siklus yang presisi: membaca item satu per satu sampai mencapai ukuran yang dikonfigurasi, memproses tiap item secara individual, lalu menulis grup. Sebuah transaksi membungkus seluruh chunk.

OrderItemReader.javajava
// ItemReader: membaca satu item per panggilan
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {

    // @StepScope: instance baru per eksekusi step
    // Memungkinkan injeksi parameter job dinamis
    @Value("#{jobParameters['startDate']}")
    private LocalDate startDate;

    private Iterator<OrderRecord> orderIterator;

    @BeforeStep
    public void initializeReader(StepExecution stepExecution) {
        // Memuat data saat step dimulai
        List<OrderRecord> orders = fetchOrdersFromDate(startDate);
        this.orderIterator = orders.iterator();
    }

    @Override
    public OrderRecord read() {
        // Mengembalikan null untuk menandai akhir data
        // Spring Batch memanggil read() sampai menerima null
        if (orderIterator.hasNext()) {
            return orderIterator.next();
        }
        return null;  // Akhir dataset
    }

    private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
        // Mengambil dari sumber data
        return List.of();  // Implementasi sebenarnya
    }
}
OrderItemProcessor.javajava
// ItemProcessor: mentransformasi tiap item secara individual
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {

    private final PricingService pricingService;
    private final ValidationService validationService;

    public OrderItemProcessor(PricingService pricingService,
                               ValidationService validationService) {
        this.pricingService = pricingService;
        this.validationService = validationService;
    }

    @Override
    public ProcessedOrder process(OrderRecord item) {
        // Mengembalikan null memfilter item (tidak akan ditulis)
        if (!validationService.isValid(item)) {
            return null;  // Item difilter
        }

        // Transformasi bisnis
        BigDecimal finalPrice = pricingService.calculatePrice(item);

        return new ProcessedOrder(
                item.orderId(),
                item.customerId(),
                finalPrice,
                LocalDateTime.now()
        );
    }
}
OrderItemWriter.javajava
// ItemWriter: menulis seluruh chunk dalam satu operasi
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;

    public OrderItemWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) {
        // Chunk berisi semua item yang telah diproses
        // Penulisan batch untuk performa optimal
        List<? extends ProcessedOrder> items = chunk.getItems();

        jdbcTemplate.batchUpdate(
                "INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
                items,
                items.size(),
                (ps, order) -> {
                    ps.setLong(1, order.orderId());
                    ps.setLong(2, order.customerId());
                    ps.setBigDecimal(3, order.finalPrice());
                    ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                }
        );
    }
}

Jika terjadi exception saat pemrosesan chunk, transaksi di-rollback. Job kemudian dapat dilanjutkan dari chunk tersebut menggunakan metadata yang tersimpan di JobRepository.

Pertanyaan 4: Bagaimana memilih ukuran chunk yang optimal?

Ukuran chunk berdampak langsung pada performa dan konsumsi memori. Chunk yang terlalu kecil memperbanyak commit (overhead). Chunk yang terlalu besar memakan memori berlebih dan memperpanjang rollback saat gagal.

ChunkSizingConfig.javajava
// Konfigurasi dinamis ukuran chunk
@Configuration
public class ChunkSizingConfig {

    // Default yang masuk akal untuk sebagian besar kasus
    private static final int DEFAULT_CHUNK_SIZE = 100;

    // Untuk item ringan (sedikit field)
    private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;

    // Untuk item berat (blob, dokumen)
    private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;

    @Bean
    public Step processLightDataStep(JobRepository jobRepository,
                                      PlatformTransactionManager txManager,
                                      ItemReader<LightRecord> reader,
                                      ItemWriter<LightRecord> writer) {
        return new StepBuilder("processLightDataStep", jobRepository)
                // Item ringan: chunk lebih besar untuk lebih sedikit commit
                .<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
                .reader(reader)
                .writer(writer)
                .build();
    }

    @Bean
    public Step processDocumentsStep(JobRepository jobRepository,
                                      PlatformTransactionManager txManager,
                                      ItemReader<Document> reader,
                                      ItemProcessor<Document, ProcessedDocument> processor,
                                      ItemWriter<ProcessedDocument> writer) {
        return new StepBuilder("processDocumentsStep", jobRepository)
                // Dokumen berat: chunk lebih kecil untuk membatasi memori
                .<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}
Aturan praktis

Mulai dengan 100 item per chunk, lalu sesuaikan berdasarkan metrik: waktu commit, penggunaan memori, dan durasi rollback. Gunakan listener untuk memantau dan menemukan titik optimal.

Partitioning untuk pemrosesan paralel

Pertanyaan 5: Apa itu partitioning dan kapan menggunakannya?

Partitioning membagi dataset menjadi partisi independen yang diproses paralel. Setiap partisi berjalan di thread sendiri (lokal) atau pada worker remote. Pendekatan ini melipatgandakan throughput tanpa mengorbankan kemampuan restart.

PartitionedJobConfig.javajava
// Konfigurasi job berpartisi
@Configuration
public class PartitionedJobConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public PartitionedJobConfig(JobRepository jobRepository,
                                 PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    @Bean
    public Job partitionedImportJob(Step partitionedStep) {
        return new JobBuilder("partitionedImportJob", jobRepository)
                .start(partitionedStep)
                .build();
    }

    // Manager step: mengorkestrasi partisi
    @Bean
    public Step partitionedStep(Partitioner partitioner,
                                 Step workerStep,
                                 TaskExecutor taskExecutor) {
        return new StepBuilder("partitionedStep", jobRepository)
                // Membagi pekerjaan melalui Partitioner
                .partitioner("workerStep", partitioner)
                // Step yang dieksekusi untuk setiap partisi
                .step(workerStep)
                // 8 thread paralel
                .taskExecutor(taskExecutor)
                // Jumlah partisi yang dibuat
                .gridSize(8)
                .build();
    }

    // TaskExecutor untuk eksekusi paralel
    @Bean
    public TaskExecutor batchTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(16);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("batch-partition-");
        executor.initialize();
        return executor;
    }
}
RangePartitioner.javajava
// Partitioner berbasis rentang ID
@Component
public class RangePartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public RangePartitioner(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // Mengambil batas dataset
        Long minId = jdbcTemplate.queryForObject(
                "SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
        Long maxId = jdbcTemplate.queryForObject(
                "SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);

        if (minId == null || maxId == null) {
            return Map.of();  // Tidak ada data untuk diproses
        }

        // Menghitung ukuran tiap partisi
        long range = (maxId - minId) / gridSize + 1;
        Map<String, ExecutionContext> partitions = new HashMap<>();

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            long start = minId + (i * range);
            long end = Math.min(start + range - 1, maxId);

            // Tiap partisi menerima batasnya
            context.putLong("minId", start);
            context.putLong("maxId", end);
            context.putInt("partitionNumber", i);

            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

Partitioning cocok untuk dataset besar dengan item independen. Partisi harus seimbang agar partisi lambat tidak memperlambat seluruh job.

Pertanyaan 6: Apa perbedaan partitioning lokal dan remote?

Partitioning lokal menjalankan semua partisi di JVM yang sama dengan thread pool. Partitioning remote mendistribusikan partisi ke beberapa JVM (worker) melalui middleware messaging.

RemotePartitioningConfig.javajava
// Konfigurasi partitioning remote dengan messaging
@Configuration
public class RemotePartitioningConfig {

    @Bean
    public Step managerStep(JobRepository jobRepository,
                             Partitioner partitioner,
                             MessageChannelPartitionHandler partitionHandler) {
        return new StepBuilder("managerStep", jobRepository)
                .partitioner("workerStep", partitioner)
                // Handler yang berkomunikasi dengan worker remote
                .partitionHandler(partitionHandler)
                .build();
    }

    // PartitionHandler mengirim ExecutionContext ke worker
    @Bean
    public MessageChannelPartitionHandler partitionHandler(
            MessagingTemplate messagingTemplate,
            JobExplorer jobExplorer) {
        MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
        handler.setStepName("workerStep");
        handler.setGridSize(4);
        handler.setMessagingOperations(messagingTemplate);
        handler.setJobExplorer(jobExplorer);
        // Timeout menunggu worker selesai
        handler.setPollInterval(5000L);
        return handler;
    }
}
WorkerConfiguration.javajava
// Konfigurasi sisi worker
@Configuration
public class WorkerConfiguration {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public WorkerConfiguration(JobRepository jobRepository,
                                PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    // Worker menerima partisi dan menjalankan step
    @Bean
    public Step workerStep(ItemReader<OrderRecord> reader,
                            ItemProcessor<OrderRecord, ProcessedOrder> processor,
                            ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("workerStep", jobRepository)
                .<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
                // Reader dengan @StepScope untuk menerima parameter partisi
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    // Reader yang memakai batas partisi
    @Bean
    @StepScope
    public JdbcCursorItemReader<OrderRecord> partitionedReader(
            DataSource dataSource,
            @Value("#{stepExecutionContext['minId']}") Long minId,
            @Value("#{stepExecutionContext['maxId']}") Long maxId) {
        return new JdbcCursorItemReaderBuilder<OrderRecord>()
                .name("partitionedOrderReader")
                .dataSource(dataSource)
                .sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
                .preparedStatementSetter(ps -> {
                    ps.setLong(1, minId);
                    ps.setLong(2, maxId);
                })
                .rowMapper(new OrderRecordRowMapper())
                .build();
    }
}

Siap menguasai wawancara Spring Boot Anda?

Berlatih dengan simulator interaktif, flashcards, dan tes teknis kami.

Toleransi kegagalan dan pemulihan kesalahan

Pertanyaan 7: Mekanisme toleransi kegagalan apa yang ditawarkan Spring Batch?

Spring Batch menawarkan tiga mekanisme komplementer: skip (mengabaikan item yang gagal), retry (mencoba ulang otomatis), dan restart (melanjutkan job yang gagal). Mekanisme ini dikonfigurasi di tingkat step.

FaultTolerantStepConfig.javajava
// Konfigurasi toleransi kegagalan lengkap
@Configuration
public class FaultTolerantStepConfig {

    @Bean
    public Step faultTolerantStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<DataRecord> reader,
                                   ItemProcessor<DataRecord, ProcessedRecord> processor,
                                   ItemWriter<ProcessedRecord> writer,
                                   SkipPolicy customSkipPolicy) {
        return new StepBuilder("faultTolerantStep", jobRepository)
                .<DataRecord, ProcessedRecord>chunk(100, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                // Mengaktifkan mode toleran terhadap kegagalan
                .faultTolerant()
                // SKIP: mengabaikan hingga 10 error validasi
                .skipLimit(10)
                .skip(ValidationException.class)
                .skip(DataIntegrityViolationException.class)
                // Beberapa error tidak boleh dilewati
                .noSkip(FatalBatchException.class)
                // RETRY: mencoba ulang error transien
                .retryLimit(3)
                .retry(TransientDataAccessException.class)
                .retry(DeadlockLoserDataAccessException.class)
                // Backoff eksponensial antar retry
                .backOffPolicy(exponentialBackOffPolicy())
                // Listener untuk mencatat skip
                .listener(skipListener())
                .build();
    }

    @Bean
    public BackOffPolicy exponentialBackOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(1000);  // 1 detik
        policy.setMultiplier(2.0);         // Berlipat setiap retry
        policy.setMaxInterval(10000);      // Maksimum 10 detik
        return policy;
    }

    @Bean
    public SkipListener<DataRecord, ProcessedRecord> skipListener() {
        return new SkipListener<>() {
            @Override
            public void onSkipInRead(Throwable t) {
                // Catat item yang tidak terbaca
            }

            @Override
            public void onSkipInProcess(DataRecord item, Throwable t) {
                // Catat item yang gagal saat diproses
            }

            @Override
            public void onSkipInWrite(ProcessedRecord item, Throwable t) {
                // Catat item yang gagal saat ditulis
            }
        };
    }
}

Retry cocok untuk error transien (timeout jaringan, deadlock database). Skip cocok untuk error data individual yang tidak seharusnya memblokir pemrosesan keseluruhan.

Pertanyaan 8: Bagaimana mengimplementasikan SkipPolicy kustom?

SkipPolicy kustom memungkinkan logika keputusan halus: skip berdasarkan tipe exception, jumlah error, atau kriteria bisnis spesifik.

AdaptiveSkipPolicy.javajava
// SkipPolicy dengan logika bisnis lanjutan
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {

    private static final int MAX_SKIP_COUNT = 100;
    private static final double MAX_SKIP_PERCENTAGE = 0.05;  // 5% maksimum

    private final AtomicInteger totalProcessed = new AtomicInteger(0);
    private final AtomicInteger skipCount = new AtomicInteger(0);

    @Override
    public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
        // Jangan pernah melewatkan error fatal
        if (exception instanceof FatalBatchException
                || exception instanceof OutOfMemoryError) {
            return false;
        }

        // Batas absolut skip
        if (skipCountSoFar >= MAX_SKIP_COUNT) {
            return false;  // Hentikan job
        }

        // Batas persentase
        int total = totalProcessed.get();
        if (total > 1000) {  // Terapkan hanya setelah warmup
            double skipPercentage = (double) skipCountSoFar / total;
            if (skipPercentage > MAX_SKIP_PERCENTAGE) {
                return false;  // Terlalu banyak error secara proporsional
            }
        }

        // Lewati error validasi dan data
        return exception instanceof ValidationException
                || exception instanceof DataFormatException
                || exception instanceof IllegalArgumentException;
    }

    // Dipanggil oleh listener untuk memantau progres
    public void incrementProcessed() {
        totalProcessed.incrementAndGet();
    }
}

Pertanyaan 9: Bagaimana restart job yang gagal bekerja?

JobRepository menyimpan status setiap eksekusi. Saat restart, Spring Batch mengidentifikasi chunk terakhir yang dikomit dan melanjutkan dari titik tersebut. Item yang sudah berhasil diproses tidak diproses ulang.

JobRestartService.javajava
// Layanan manajemen restart job
@Service
public class JobRestartService {

    private final JobLauncher jobLauncher;
    private final JobExplorer jobExplorer;
    private final JobRepository jobRepository;
    private final Job importJob;

    public JobRestartService(JobLauncher jobLauncher,
                              JobExplorer jobExplorer,
                              JobRepository jobRepository,
                              @Qualifier("importJob") Job importJob) {
        this.jobLauncher = jobLauncher;
        this.jobExplorer = jobExplorer;
        this.jobRepository = jobRepository;
        this.importJob = importJob;
    }

    public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
        // Mengambil eksekusi yang gagal
        JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);

        if (failedExecution == null) {
            throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
        }

        // Memverifikasi job dapat di-restart
        if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
            throw new IllegalStateException("Only FAILED jobs can be restarted");
        }

        // Menggunakan parameter yang sama dengan eksekusi awal
        JobParameters originalParams = failedExecution.getJobParameters();

        // Meluncurkan ulang job - otomatis melanjutkan dari checkpoint terakhir
        return jobLauncher.run(importJob, originalParams);
    }

    public List<JobExecution> findRestartableJobs() {
        // Daftar semua eksekusi FAILED yang belum di-restart
        return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
                .stream()
                .flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
                .filter(exec -> exec.getStatus() == BatchStatus.FAILED)
                .filter(this::isRestartable)
                .toList();
    }

    private boolean isRestartable(JobExecution execution) {
        // Memverifikasi tidak ada eksekusi sukses yang lebih baru
        JobInstance instance = execution.getJobInstance();
        return jobExplorer.getJobExecutions(instance).stream()
                .noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
    }
}
Jebakan wawancara

Job hanya dapat di-restart jika JobParameters identik. Mengubah parameter membuat instance job baru dan menghilangkan riwayat progres.

Penskalaan dan optimisasi

Pertanyaan 10: Strategi penskalaan apa yang tersedia?

Spring Batch menawarkan empat strategi: multi-threaded step (banyak thread membaca paralel), parallel steps (step independen paralel), remote chunking (pemrosesan terdistribusi), dan partitioning (data terdistribusi).

MultiThreadedStepConfig.javajava
// Step multi-thread: banyak thread memproses dataset yang sama
@Configuration
public class MultiThreadedStepConfig {

    @Bean
    public Step multiThreadedStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<Record> reader,
                                   ItemProcessor<Record, ProcessedRecord> processor,
                                   ItemWriter<ProcessedRecord> writer,
                                   TaskExecutor taskExecutor) {
        return new StepBuilder("multiThreadedStep", jobRepository)
                .<Record, ProcessedRecord>chunk(100, transactionManager)
                // PERHATIAN: reader harus thread-safe
                .reader(synchronizedReader(reader))
                .processor(processor)
                .writer(writer)
                // 4 thread memproses chunk paralel
                .taskExecutor(taskExecutor)
                .throttleLimit(4)
                .build();
    }

    // Wrapper untuk membuat reader thread-safe
    private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
        SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
        syncReader.setDelegate((ItemStreamReader<Record>) reader);
        return syncReader;
    }
}
ParallelStepsConfig.javajava
// Eksekusi step independen secara paralel
@Configuration
public class ParallelStepsConfig {

    @Bean
    public Job parallelJob(JobRepository jobRepository,
                            Step loadCustomersStep,
                            Step loadProductsStep,
                            Step loadOrdersStep,
                            Step processDataStep) {
        // Flow paralel: customers dan products dimuat bersamaan
        Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
                .start(loadCustomersStep)
                .build();

        Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
                .start(loadProductsStep)
                .build();

        Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
                .start(loadOrdersStep)
                .build();

        // Split menjalankan flow secara paralel
        return new JobBuilder("parallelJob", jobRepository)
                .start(new FlowBuilder<Flow>("parallelLoadFlow")
                        .split(new SimpleAsyncTaskExecutor())
                        .add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
                        .build())
                // Setelah load paralel, pemrosesan berurutan
                .next(processDataStep)
                .build()
                .build();
    }
}

Multi-threading cocok jika reader bisa disinkronkan. Partitioning lebih disukai untuk volume besar karena setiap partisi punya reader sendiri tanpa kontensi.

Pertanyaan 11: Bagaimana memantau performa job?

Spring Batch mengekspos metrik melalui listener dan JobRepository. Integrasi dengan Micrometer memungkinkan ekspor ke Prometheus, Grafana, atau sistem monitoring lain.

BatchMetricsConfig.javajava
// Konfigurasi monitoring dengan Micrometer
@Configuration
public class BatchMetricsConfig {

    private final MeterRegistry meterRegistry;

    public BatchMetricsConfig(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }

    @Bean
    public JobExecutionListener metricsJobListener() {
        return new JobExecutionListener() {

            private Timer.Sample jobTimer;

            @Override
            public void beforeJob(JobExecution jobExecution) {
                // Memulai timer durasi job
                jobTimer = Timer.start(meterRegistry);
                Counter.builder("batch.job.started")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .register(meterRegistry)
                        .increment();
            }

            @Override
            public void afterJob(JobExecution jobExecution) {
                // Mencatat durasi total
                jobTimer.stop(Timer.builder("batch.job.duration")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .tag("status", jobExecution.getStatus().toString())
                        .register(meterRegistry));

                // Counter job per status
                Counter.builder("batch.job.completed")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .tag("status", jobExecution.getStatus().toString())
                        .register(meterRegistry)
                        .increment();
            }
        };
    }

    @Bean
    public StepExecutionListener metricsStepListener() {
        return new StepExecutionListener() {

            @Override
            public void afterStep(StepExecution stepExecution) {
                String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
                String stepName = stepExecution.getStepName();

                // Metrik throughput
                Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                return null;
            }
        };
    }
}

Pertanyaan 12: Apa jebakan umum saat partitioning?

Kesalahan yang sering muncul: partisi tidak seimbang (satu partisi memuat 90% data), reader yang tidak thread-safe, dan manajemen state yang salah antar partisi.

BalancedPartitioner.javajava
// Partitioner yang benar-benar menyeimbangkan beban
@Component
public class BalancedPartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // Menghitung total item yang akan diproses
        Integer totalCount = jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);

        if (totalCount == null || totalCount == 0) {
            return Map.of();
        }

        // Menghitung target ukuran per partisi
        int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);

        Map<String, ExecutionContext> partitions = new HashMap<>();

        // Memakai OFFSET/LIMIT untuk partisi seimbang
        // Lebih mahal daripada range tetapi menjamin keseimbangan
        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.putInt("offset", i * itemsPerPartition);
            context.putInt("limit", itemsPerPartition);
            context.putInt("partitionNumber", i);

            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

// OffsetBasedReader.java
// Reader kompatibel dengan partitioning berbasis offset
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {

    private final JdbcTemplate jdbcTemplate;
    private Iterator<OrderRecord> iterator;

    @Value("#{stepExecutionContext['offset']}")
    private int offset;

    @Value("#{stepExecutionContext['limit']}")
    private int limit;

    public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void open(ExecutionContext executionContext) {
        // Memuat persis bagian yang ditugaskan ke partisi ini
        List<OrderRecord> records = jdbcTemplate.query(
                "SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
                new OrderRecordRowMapper(),
                limit, offset
        );
        this.iterator = records.iterator();
    }

    @Override
    public OrderRecord read() {
        return iterator.hasNext() ? iterator.next() : null;
    }

    @Override
    public void update(ExecutionContext executionContext) {
        // Penyimpanan state untuk restart bila perlu
    }

    @Override
    public void close() {
        // Pembersihan
    }
}

Pertanyaan lanjutan untuk senior

Pertanyaan 13: Bagaimana mengelola dependensi antar job?

Spring Batch tidak mengelola dependensi antar job secara native. Solusinya: orkestrator eksternal (Airflow, Kubernetes CronJob) atau implementasi kustom dengan JobExplorer.

JobDependencyService.javajava
// Pengelolaan dependensi antar job
@Service
public class JobDependencyService {

    private final JobExplorer jobExplorer;
    private final JobLauncher jobLauncher;
    private final Map<String, Job> jobs;

    public JobDependencyService(JobExplorer jobExplorer,
                                  JobLauncher jobLauncher,
                                  Map<String, Job> jobs) {
        this.jobExplorer = jobExplorer;
        this.jobLauncher = jobLauncher;
        this.jobs = jobs;
    }

    public JobExecution runWithDependencies(String jobName,
                                             JobParameters params,
                                             List<String> dependsOn) throws Exception {
        // Memverifikasi semua dependensi berhasil
        for (String dependency : dependsOn) {
            if (!hasSuccessfulExecution(dependency, params)) {
                throw new JobExecutionException(
                        "Dependency not satisfied: " + dependency);
            }
        }

        Job job = jobs.get(jobName);
        if (job == null) {
            throw new IllegalArgumentException("Unknown job: " + jobName);
        }

        return jobLauncher.run(job, params);
    }

    private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
        // Mencari eksekusi COMPLETED dengan parameter bisnis yang sama
        return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
                .stream()
                .flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
                .filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
                .anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
    }

    private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
        // Membandingkan parameter bisnis (mengabaikan timestamp eksekusi)
        String actualDate = actual.getString("businessDate");
        String expectedDate = expected.getString("businessDate");
        return Objects.equals(actualDate, expectedDate);
    }
}

Pertanyaan 14: Bagaimana menguji job Spring Batch secara efektif?

Menguji job Spring Batch membutuhkan pendekatan berlapis: unit test untuk komponen (reader, processor, writer), integration test untuk step, dan end-to-end test untuk job lengkap.

OrderProcessorTest.javajava
// Unit test processor
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {

    @Mock
    private PricingService pricingService;

    @Mock
    private ValidationService validationService;

    @InjectMocks
    private OrderItemProcessor processor;

    @Test
    void shouldProcessValidOrder() {
        // Given
        OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
        when(validationService.isValid(input)).thenReturn(true);
        when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));

        // When
        ProcessedOrder result = processor.process(input);

        // Then
        assertThat(result).isNotNull();
        assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
    }

    @Test
    void shouldFilterInvalidOrder() {
        // Given
        OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
        when(validationService.isValid(input)).thenReturn(false);

        // When
        ProcessedOrder result = processor.process(input);

        // Then - null berarti difilter
        assertThat(result).isNull();
        verify(pricingService, never()).calculatePrice(any());
    }
}
ImportJobIntegrationTest.javajava
// Integration test untuk job lengkap
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JobRepositoryTestUtils jobRepositoryTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @BeforeEach
    void setup() {
        // Membersihkan metadata antar test
        jobRepositoryTestUtils.removeJobExecutions();
        // Mereset data uji
        jdbcTemplate.execute("DELETE FROM processed_orders");
        jdbcTemplate.execute("DELETE FROM orders");
    }

    @Test
    void shouldCompleteJobSuccessfully() throws Exception {
        // Given - data uji
        insertTestOrders(100);

        // When
        JobParameters params = new JobParametersBuilder()
                .addLocalDate("businessDate", LocalDate.now())
                .addLong("run.id", System.currentTimeMillis())
                .toJobParameters();

        JobExecution execution = jobLauncherTestUtils.launchJob(params);

        // Then
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(countProcessedOrders()).isEqualTo(100);
    }

    @Test
    void shouldHandleEmptyDataset() throws Exception {
        // Given - tanpa data

        // When
        JobExecution execution = jobLauncherTestUtils.launchJob();

        // Then - job berhasil meski tanpa data
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }

    @Test
    void shouldRestartFromFailurePoint() throws Exception {
        // Given - mensimulasikan error di tengah pemrosesan
        insertTestOrders(100);
        insertPoisonOrder(50);  // Memicu error

        // When - eksekusi pertama gagal
        JobExecution firstExecution = jobLauncherTestUtils.launchJob();
        assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);

        // Memperbaiki data
        removePoisonOrder(50);

        // When - restart
        JobExecution restartExecution = jobLauncherTestUtils.launchJob(
                firstExecution.getJobParameters());

        // Then - melanjutkan dari titik kegagalan
        assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }

    private void insertTestOrders(int count) {
        for (int i = 1; i <= count; i++) {
            jdbcTemplate.update(
                    "INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
                    i, i * 10, BigDecimal.valueOf(i * 10));
        }
    }

    private int countProcessedOrders() {
        return jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM processed_orders", Integer.class);
    }
}

Pertanyaan 15: Bagaimana mengoptimalkan performa penulisan ke database?

Penulisan sering menjadi bottleneck. Optimasi meliputi: JDBC batch insert, menonaktifkan constraint saat loading, dan penggunaan staging table.

OptimizedJdbcWriter.javajava
// Writer dioptimasi untuk volume besar
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;
    private final DataSource dataSource;

    public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
        this.jdbcTemplate = jdbcTemplate;
        this.dataSource = dataSource;
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
        List<? extends ProcessedOrder> items = chunk.getItems();

        if (items.isEmpty()) {
            return;
        }

        // Memakai PreparedStatement dengan batch
        try (Connection connection = dataSource.getConnection();
             PreparedStatement ps = connection.prepareStatement(
                     "INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
                             "VALUES (?, ?, ?, ?)")) {

            for (ProcessedOrder order : items) {
                ps.setLong(1, order.orderId());
                ps.setLong(2, order.customerId());
                ps.setBigDecimal(3, order.finalPrice());
                ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                ps.addBatch();
            }

            // Mengeksekusi semua insert dalam satu operasi jaringan
            ps.executeBatch();
        }
    }
}

// StagingTableWriter.java
// Pola staging table untuk volume sangat besar
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {

    private final JdbcTemplate jdbcTemplate;
    private String stagingTable;

    public StagingTableWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        // Membuat tabel sementara untuk step ini
        stagingTable = "staging_orders_" + stepExecution.getId();
        jdbcTemplate.execute(
                "CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) {
        // Menulis ke staging table (tanpa constraint FK)
        String sql = "INSERT INTO " + stagingTable +
                " (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";

        jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
                (ps, order) -> {
                    ps.setLong(1, order.orderId());
                    ps.setLong(2, order.customerId());
                    ps.setBigDecimal(3, order.finalPrice());
                    ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                });
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
            // Bulk copy ke tabel akhir
            jdbcTemplate.execute(
                    "INSERT INTO processed_orders SELECT * FROM " + stagingTable);
        }
        // Membersihkan staging table
        jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
        return stepExecution.getExitStatus();
    }
}

Kesimpulan

Menguasai Spring Batch 5 di wawancara teknis bersandar pada pemahaman mendalam tentang mekanisme internal:

Arsitektur: Job → Step → Chunk (Reader, Processor, Writer)

Pemrosesan chunk: penentuan ukuran, siklus hidup, transaksi

Partitioning: lokal vs remote, penyeimbangan partisi

Toleransi kegagalan: skip, retry, restart dengan kebijakan tepat

Penskalaan: multi-threading, parallel steps, remote chunking

Pengujian: unit, integrasi, end-to-end

Optimasi: batch writes, staging table, monitoring

Pertanyaan lanjutan menguji kemampuan membenarkan keputusan arsitektur sesuai konteks: volume data, batasan waktu, toleransi kesalahan, dan infrastruktur yang tersedia.

Mulai berlatih!

Uji pengetahuan Anda dengan simulator wawancara dan tes teknis kami.

Tag

#spring batch
#spring boot
#java
#batch processing
#interview questions

Bagikan

Artikel terkait