Spring Batch 5 Interview: Partitioning, Chunks en Fault Tolerance

Slaag voor Spring Batch 5 interviews: 15 essentiële vragen over partitioning, chunk-verwerking en fault tolerance met Java 21 codevoorbeelden.

Spring Batch 5 Interview: partitioning, chunks en fault tolerance

Spring Batch 5 vormt een hoeksteen voor enterprise dataverwerking in het Spring-ecosysteem. Technische interviews beoordelen het vermogen om robuuste, schaalbare en fouttolerante jobs te ontwerpen. Beheersing van partitioning, chunk-georiënteerde verwerking en fault tolerance-mechanismen onderscheidt senior ontwikkelaars.

Interview-focus

Recruiters testen diepgaand begrip: waarom partitioning kiezen boven remote chunking? Hoe chunks correct dimensioneren? Deze architectuurkeuzes onthullen echte productie-ervaring.

Kernarchitectuur van Spring Batch 5

Vraag 1: Wat zijn de belangrijkste componenten van Spring Batch?

De Spring Batch-architectuur bestaat uit drie lagen: de applicatie (jobs en businesscode), Batch Core (runtime-klassen om jobs te starten en aan te sturen) en de infrastructuur (gemeenschappelijke readers, writers en services zoals RetryTemplate).

BatchJobConfig.javajava
// Spring Batch 5 job-configuratie met Java 21
@Configuration
public class BatchJobConfig {

    // JobRepository slaat de uitvoeringsmetadata op
    // Maakt restart en job-monitoring mogelijk
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public BatchJobConfig(JobRepository jobRepository,
                          PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    // Een Job omvat het volledige batchproces
    // Bestaat uit een of meer Steps die sequentieel worden uitgevoerd
    @Bean
    public Job importUserJob(Step processUsersStep, Step cleanupStep) {
        return new JobBuilder("importUserJob", jobRepository)
                .start(processUsersStep)      // Hoofdverwerkingsstap
                .next(cleanupStep)             // Opruimstap
                .build();
    }

    // Een Step is een onafhankelijke werkeenheid
    // Twee modellen: Tasklet (enkele taak) of Chunk (iteratieve verwerking)
    @Bean
    public Step processUsersStep(ItemReader<UserRecord> reader,
                                  ItemProcessor<UserRecord, User> processor,
                                  ItemWriter<User> writer) {
        return new StepBuilder("processUsersStep", jobRepository)
                .<UserRecord, User>chunk(100, transactionManager)  // Commit elke 100 items
                .reader(reader)       // Leest brondata
                .processor(processor) // Transformeert elk item
                .writer(writer)       // Schrijft in batches van 100
                .build();
    }
}

De JobRepository persisteert de uitvoeringsstatus naar de database. Deze persistentie maakt het mogelijk een mislukte job exact te hervatten waar deze stopte, zonder reeds gecommitte data opnieuw te verwerken.

Vraag 2: Wat is het verschil tussen Tasklet en chunk-georiënteerde verwerking?

Tasklet voert een discrete, niet-iteratieve actie uit: bestand verwijderen, stored procedure aanroepen, notificatie-e-mail versturen. Chunk verwerkt grote volumes door data op te splitsen in beheersbare batches.

CleanupTasklet.javajava
// Tasklet: enkele actie zonder iteratie
@Component
public class CleanupTasklet implements Tasklet {

    private final Path tempDirectory = Path.of("/tmp/batch-work");

    @Override
    public RepeatStatus execute(StepContribution contribution,
                                 ChunkContext chunkContext) throws Exception {
        // Verwijdert alle tijdelijke verwerkingsbestanden
        try (var files = Files.walk(tempDirectory)) {
            files.filter(Files::isRegularFile)
                 .forEach(this::deleteQuietly);
        }

        // FINISHED geeft aan dat de tasklet zijn werk heeft voltooid
        // CONTINUABLE zou de uitvoering herstarten (handig voor polling)
        return RepeatStatus.FINISHED;
    }

    private void deleteQuietly(Path file) {
        try {
            Files.delete(file);
        } catch (IOException e) {
            // Loggen en doorgaan - job niet falen om één bestand
        }
    }
}
ChunkProcessingConfig.javajava
// Chunk-verwerking: high-volume verwerking
@Configuration
public class ChunkProcessingConfig {

    @Bean
    public Step processOrdersStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<OrderRecord> reader,
                                   ItemProcessor<OrderRecord, ProcessedOrder> processor,
                                   ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("processOrdersStep", jobRepository)
                // Chunk van 500: leest 500 items, verwerkt, schrijft, committeert
                .<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                // Listener om voortgang te monitoren
                .listener(new ChunkProgressListener())
                .build();
    }
}

Chunk-georiënteerde verwerking biedt cruciale voordelen: geoptimaliseerd geheugenbeheer (alleen huidige chunk in geheugen), granulaire transacties (commit per chunk) en herstel bij de laatste gecommitte chunk.

Diepgaande chunk-georiënteerde verwerking

Vraag 3: Hoe werkt de chunk-levenscyclus?

Elke chunk volgt een precieze cyclus: items één voor één lezen tot de geconfigureerde grootte, elk item individueel verwerken, daarna de groep schrijven. Een transactie omhult de hele chunk.

OrderItemReader.javajava
// ItemReader: leest één item per keer
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {

    // @StepScope: nieuwe instantie per step-uitvoering
    // Maakt het injecteren van dynamische job-parameters mogelijk
    @Value("#{jobParameters['startDate']}")
    private LocalDate startDate;

    private Iterator<OrderRecord> orderIterator;

    @BeforeStep
    public void initializeReader(StepExecution stepExecution) {
        // Laadt data bij start van de step
        List<OrderRecord> orders = fetchOrdersFromDate(startDate);
        this.orderIterator = orders.iterator();
    }

    @Override
    public OrderRecord read() {
        // Geeft null terug om einde data te signaleren
        // Spring Batch roept read() aan tot null wordt ontvangen
        if (orderIterator.hasNext()) {
            return orderIterator.next();
        }
        return null;  // Einde dataset
    }

    private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
        // Haalt op uit databron
        return List.of();  // Werkelijke implementatie
    }
}
OrderItemProcessor.javajava
// ItemProcessor: transformeert elk item afzonderlijk
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {

    private final PricingService pricingService;
    private final ValidationService validationService;

    public OrderItemProcessor(PricingService pricingService,
                               ValidationService validationService) {
        this.pricingService = pricingService;
        this.validationService = validationService;
    }

    @Override
    public ProcessedOrder process(OrderRecord item) {
        // null teruggeven filtert het item (wordt niet geschreven)
        if (!validationService.isValid(item)) {
            return null;  // Item gefilterd
        }

        // Business-transformatie
        BigDecimal finalPrice = pricingService.calculatePrice(item);

        return new ProcessedOrder(
                item.orderId(),
                item.customerId(),
                finalPrice,
                LocalDateTime.now()
        );
    }
}
OrderItemWriter.javajava
// ItemWriter: schrijft de complete chunk in één operatie
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;

    public OrderItemWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) {
        // De chunk bevat alle verwerkte items
        // Batchschrijven voor optimale performance
        List<? extends ProcessedOrder> items = chunk.getItems();

        jdbcTemplate.batchUpdate(
                "INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
                items,
                items.size(),
                (ps, order) -> {
                    ps.setLong(1, order.orderId());
                    ps.setLong(2, order.customerId());
                    ps.setBigDecimal(3, order.finalPrice());
                    ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                }
        );
    }
}

Als er tijdens chunk-verwerking een exception optreedt, wordt de transactie teruggedraaid. De job kan dan vanaf die chunk hervatten met de metadata in het JobRepository.

Vraag 4: Hoe kies je de optimale chunk-grootte?

De chunk-grootte beïnvloedt direct performance en geheugengebruik. Een te kleine chunk vermenigvuldigt commits (overhead). Een te grote chunk verbruikt buitensporig veel geheugen en verlengt rollbacks bij falen.

ChunkSizingConfig.javajava
// Dynamische configuratie van chunk-grootte
@Configuration
public class ChunkSizingConfig {

    // Redelijke standaard voor de meeste gevallen
    private static final int DEFAULT_CHUNK_SIZE = 100;

    // Voor lichte items (weinig velden)
    private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;

    // Voor zware items (blobs, documenten)
    private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;

    @Bean
    public Step processLightDataStep(JobRepository jobRepository,
                                      PlatformTransactionManager txManager,
                                      ItemReader<LightRecord> reader,
                                      ItemWriter<LightRecord> writer) {
        return new StepBuilder("processLightDataStep", jobRepository)
                // Lichte items: grotere chunks voor minder commits
                .<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
                .reader(reader)
                .writer(writer)
                .build();
    }

    @Bean
    public Step processDocumentsStep(JobRepository jobRepository,
                                      PlatformTransactionManager txManager,
                                      ItemReader<Document> reader,
                                      ItemProcessor<Document, ProcessedDocument> processor,
                                      ItemWriter<ProcessedDocument> writer) {
        return new StepBuilder("processDocumentsStep", jobRepository)
                // Zware documenten: kleinere chunks om geheugen te beperken
                .<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}
Vuistregel

Begin met 100 items per chunk en pas aan op basis van metrieken: committijd, geheugengebruik en rollback-duur. Gebruik listeners om het optimum te identificeren.

Partitioning voor parallelle verwerking

Vraag 5: Wat is partitioning en wanneer gebruik je het?

Partitioning verdeelt een dataset in onafhankelijke partities die parallel worden verwerkt. Elke partitie draait in een eigen thread (lokaal) of op een remote worker. Deze aanpak vermenigvuldigt de doorvoer zonder restart-mogelijkheden op te offeren.

PartitionedJobConfig.javajava
// Configuratie van een gepartitioneerde job
@Configuration
public class PartitionedJobConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public PartitionedJobConfig(JobRepository jobRepository,
                                 PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    @Bean
    public Job partitionedImportJob(Step partitionedStep) {
        return new JobBuilder("partitionedImportJob", jobRepository)
                .start(partitionedStep)
                .build();
    }

    // Manager-step: orkestreert de partities
    @Bean
    public Step partitionedStep(Partitioner partitioner,
                                 Step workerStep,
                                 TaskExecutor taskExecutor) {
        return new StepBuilder("partitionedStep", jobRepository)
                // Verdeelt het werk via de Partitioner
                .partitioner("workerStep", partitioner)
                // Step uitgevoerd voor elke partitie
                .step(workerStep)
                // 8 parallelle threads
                .taskExecutor(taskExecutor)
                // Aantal te creëren partities
                .gridSize(8)
                .build();
    }

    // TaskExecutor voor parallelle uitvoering
    @Bean
    public TaskExecutor batchTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(16);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("batch-partition-");
        executor.initialize();
        return executor;
    }
}
RangePartitioner.javajava
// Partitioner gebaseerd op ID-bereiken
@Component
public class RangePartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public RangePartitioner(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // Haalt de grenzen van de dataset op
        Long minId = jdbcTemplate.queryForObject(
                "SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
        Long maxId = jdbcTemplate.queryForObject(
                "SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);

        if (minId == null || maxId == null) {
            return Map.of();  // Geen data om te verwerken
        }

        // Berekent de grootte van elke partitie
        long range = (maxId - minId) / gridSize + 1;
        Map<String, ExecutionContext> partitions = new HashMap<>();

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            long start = minId + (i * range);
            long end = Math.min(start + range - 1, maxId);

            // Elke partitie krijgt zijn grenzen
            context.putLong("minId", start);
            context.putLong("maxId", end);
            context.putInt("partitionNumber", i);

            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

Partitioning is geschikt voor grote datasets waarin items onafhankelijk zijn. Partities moeten gebalanceerd zijn om te voorkomen dat een trage partitie de hele job vertraagt.

Vraag 6: Wat is het verschil tussen lokale en remote partitioning?

Lokale partitioning voert alle partities uit op dezelfde JVM met een thread pool. Remote partitioning verdeelt partities over meerdere JVM's (workers) via messaging-middleware.

RemotePartitioningConfig.javajava
// Remote partitioning-configuratie met messaging
@Configuration
public class RemotePartitioningConfig {

    @Bean
    public Step managerStep(JobRepository jobRepository,
                             Partitioner partitioner,
                             MessageChannelPartitionHandler partitionHandler) {
        return new StepBuilder("managerStep", jobRepository)
                .partitioner("workerStep", partitioner)
                // Handler die communiceert met remote workers
                .partitionHandler(partitionHandler)
                .build();
    }

    // PartitionHandler stuurt ExecutionContexts naar workers
    @Bean
    public MessageChannelPartitionHandler partitionHandler(
            MessagingTemplate messagingTemplate,
            JobExplorer jobExplorer) {
        MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
        handler.setStepName("workerStep");
        handler.setGridSize(4);
        handler.setMessagingOperations(messagingTemplate);
        handler.setJobExplorer(jobExplorer);
        // Timeout voor wachten op worker-voltooiing
        handler.setPollInterval(5000L);
        return handler;
    }
}
WorkerConfiguration.javajava
// Configuratie aan workerzijde
@Configuration
public class WorkerConfiguration {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public WorkerConfiguration(JobRepository jobRepository,
                                PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    // De worker ontvangt partities en voert de step uit
    @Bean
    public Step workerStep(ItemReader<OrderRecord> reader,
                            ItemProcessor<OrderRecord, ProcessedOrder> processor,
                            ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("workerStep", jobRepository)
                .<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
                // Reader met @StepScope voor partitieparameters
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    // Reader die partitiegrenzen gebruikt
    @Bean
    @StepScope
    public JdbcCursorItemReader<OrderRecord> partitionedReader(
            DataSource dataSource,
            @Value("#{stepExecutionContext['minId']}") Long minId,
            @Value("#{stepExecutionContext['maxId']}") Long maxId) {
        return new JdbcCursorItemReaderBuilder<OrderRecord>()
                .name("partitionedOrderReader")
                .dataSource(dataSource)
                .sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
                .preparedStatementSetter(ps -> {
                    ps.setLong(1, minId);
                    ps.setLong(2, maxId);
                })
                .rowMapper(new OrderRecordRowMapper())
                .build();
    }
}

Klaar om je Spring Boot gesprekken te halen?

Oefen met onze interactieve simulatoren, flashcards en technische tests.

Fault tolerance en foutherstel

Vraag 7: Welke fault tolerance-mechanismen biedt Spring Batch?

Spring Batch biedt drie complementaire mechanismen: skip (falende items negeren), retry (automatisch opnieuw proberen) en restart (een mislukte job hervatten). Deze mechanismen worden geconfigureerd op stepniveau.

FaultTolerantStepConfig.javajava
// Volledige fault tolerance-configuratie
@Configuration
public class FaultTolerantStepConfig {

    @Bean
    public Step faultTolerantStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<DataRecord> reader,
                                   ItemProcessor<DataRecord, ProcessedRecord> processor,
                                   ItemWriter<ProcessedRecord> writer,
                                   SkipPolicy customSkipPolicy) {
        return new StepBuilder("faultTolerantStep", jobRepository)
                .<DataRecord, ProcessedRecord>chunk(100, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                // Activeert fault tolerant-modus
                .faultTolerant()
                // SKIP: negeert tot 10 validatiefouten
                .skipLimit(10)
                .skip(ValidationException.class)
                .skip(DataIntegrityViolationException.class)
                // Sommige fouten mogen nooit worden overgeslagen
                .noSkip(FatalBatchException.class)
                // RETRY: retry voor tijdelijke fouten
                .retryLimit(3)
                .retry(TransientDataAccessException.class)
                .retry(DeadlockLoserDataAccessException.class)
                // Exponentiële backoff tussen retries
                .backOffPolicy(exponentialBackOffPolicy())
                // Listener om skips te loggen
                .listener(skipListener())
                .build();
    }

    @Bean
    public BackOffPolicy exponentialBackOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(1000);  // 1 seconde
        policy.setMultiplier(2.0);         // Verdubbelt elke retry
        policy.setMaxInterval(10000);      // Max 10 seconden
        return policy;
    }

    @Bean
    public SkipListener<DataRecord, ProcessedRecord> skipListener() {
        return new SkipListener<>() {
            @Override
            public void onSkipInRead(Throwable t) {
                // Log onleesbaar item
            }

            @Override
            public void onSkipInProcess(DataRecord item, Throwable t) {
                // Log item dat faalde bij verwerking
            }

            @Override
            public void onSkipInWrite(ProcessedRecord item, Throwable t) {
                // Log item dat faalde bij schrijven
            }
        };
    }
}

Retry past bij tijdelijke fouten (netwerk-timeout, database-deadlock). Skip past bij individuele datafouten die de globale verwerking niet mogen blokkeren.

Vraag 8: Hoe implementeer je een eigen SkipPolicy?

Een eigen SkipPolicy maakt fijnmazige beslissingslogica mogelijk: skippen op basis van exception-type, foutaantal of specifieke businesscriteria.

AdaptiveSkipPolicy.javajava
// SkipPolicy met geavanceerde businesslogica
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {

    private static final int MAX_SKIP_COUNT = 100;
    private static final double MAX_SKIP_PERCENTAGE = 0.05;  // 5% maximum

    private final AtomicInteger totalProcessed = new AtomicInteger(0);
    private final AtomicInteger skipCount = new AtomicInteger(0);

    @Override
    public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
        // Fatale fouten nooit overslaan
        if (exception instanceof FatalBatchException
                || exception instanceof OutOfMemoryError) {
            return false;
        }

        // Absolute skip-limiet
        if (skipCountSoFar >= MAX_SKIP_COUNT) {
            return false;  // Stop de job
        }

        // Percentuele limiet
        int total = totalProcessed.get();
        if (total > 1000) {  // Pas pas toe na warmup
            double skipPercentage = (double) skipCountSoFar / total;
            if (skipPercentage > MAX_SKIP_PERCENTAGE) {
                return false;  // Te veel fouten naar verhouding
            }
        }

        // Skip validatie- en datafouten
        return exception instanceof ValidationException
                || exception instanceof DataFormatException
                || exception instanceof IllegalArgumentException;
    }

    // Aangeroepen door een listener om voortgang bij te houden
    public void incrementProcessed() {
        totalProcessed.incrementAndGet();
    }
}

Vraag 9: Hoe werkt het herstarten van een mislukte job?

Het JobRepository slaat de status van elke uitvoering op. Bij herstart identificeert Spring Batch de laatste gecommitte chunk en hervat vanaf dat punt. Succesvol verwerkte items worden niet opnieuw verwerkt.

JobRestartService.javajava
// Service voor het beheer van job-restarts
@Service
public class JobRestartService {

    private final JobLauncher jobLauncher;
    private final JobExplorer jobExplorer;
    private final JobRepository jobRepository;
    private final Job importJob;

    public JobRestartService(JobLauncher jobLauncher,
                              JobExplorer jobExplorer,
                              JobRepository jobRepository,
                              @Qualifier("importJob") Job importJob) {
        this.jobLauncher = jobLauncher;
        this.jobExplorer = jobExplorer;
        this.jobRepository = jobRepository;
        this.importJob = importJob;
    }

    public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
        // Haalt de mislukte uitvoering op
        JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);

        if (failedExecution == null) {
            throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
        }

        // Verifieert dat de job kan worden herstart
        if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
            throw new IllegalStateException("Only FAILED jobs can be restarted");
        }

        // Gebruikt dezelfde parameters als de originele uitvoering
        JobParameters originalParams = failedExecution.getJobParameters();

        // Herstart de job - hervat automatisch vanaf laatste checkpoint
        return jobLauncher.run(importJob, originalParams);
    }

    public List<JobExecution> findRestartableJobs() {
        // Lijst alle FAILED-uitvoeringen die nog niet zijn herstart
        return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
                .stream()
                .flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
                .filter(exec -> exec.getStatus() == BatchStatus.FAILED)
                .filter(this::isRestartable)
                .toList();
    }

    private boolean isRestartable(JobExecution execution) {
        // Verifieert dat geen recentere succesvolle uitvoering bestaat
        JobInstance instance = execution.getJobInstance();
        return jobExplorer.getJobExecutions(instance).stream()
                .noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
    }
}
Interview-valkuil

Een job kan alleen worden herstart als de JobParameters identiek zijn. Een parameter wijzigen creëert een nieuwe job-instantie en gaat de voortgangsgeschiedenis verloren.

Schaling en optimalisatie

Vraag 10: Welke schaalstrategieën zijn beschikbaar?

Spring Batch biedt vier strategieën: multi-threaded step (meerdere threads lezen parallel), parallel steps (onafhankelijke steps parallel), remote chunking (gedistribueerde verwerking) en partitioning (gedistribueerde data).

MultiThreadedStepConfig.javajava
// Multi-threaded step: meerdere threads verwerken dezelfde dataset
@Configuration
public class MultiThreadedStepConfig {

    @Bean
    public Step multiThreadedStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<Record> reader,
                                   ItemProcessor<Record, ProcessedRecord> processor,
                                   ItemWriter<ProcessedRecord> writer,
                                   TaskExecutor taskExecutor) {
        return new StepBuilder("multiThreadedStep", jobRepository)
                .<Record, ProcessedRecord>chunk(100, transactionManager)
                // LET OP: reader moet thread-safe zijn
                .reader(synchronizedReader(reader))
                .processor(processor)
                .writer(writer)
                // 4 threads verwerken chunks parallel
                .taskExecutor(taskExecutor)
                .throttleLimit(4)
                .build();
    }

    // Wrapper om de reader thread-safe te maken
    private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
        SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
        syncReader.setDelegate((ItemStreamReader<Record>) reader);
        return syncReader;
    }
}
ParallelStepsConfig.javajava
// Onafhankelijke steps parallel uitvoeren
@Configuration
public class ParallelStepsConfig {

    @Bean
    public Job parallelJob(JobRepository jobRepository,
                            Step loadCustomersStep,
                            Step loadProductsStep,
                            Step loadOrdersStep,
                            Step processDataStep) {
        // Parallelle flow: customers en products gelijktijdig geladen
        Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
                .start(loadCustomersStep)
                .build();

        Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
                .start(loadProductsStep)
                .build();

        Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
                .start(loadOrdersStep)
                .build();

        // Split voert flows parallel uit
        return new JobBuilder("parallelJob", jobRepository)
                .start(new FlowBuilder<Flow>("parallelLoadFlow")
                        .split(new SimpleAsyncTaskExecutor())
                        .add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
                        .build())
                // Na parallel laden, sequentiële verwerking
                .next(processDataStep)
                .build()
                .build();
    }
}

Multi-threading past wanneer de reader gesynchroniseerd kan worden. Partitioning is te verkiezen voor grote volumes omdat elke partitie een eigen reader heeft zonder contentie.

Vraag 11: Hoe monitor je de performance van een job?

Spring Batch ontsluit metrieken via listeners en JobRepository. Integratie met Micrometer maakt export mogelijk naar Prometheus, Grafana of andere monitoringsystemen.

BatchMetricsConfig.javajava
// Monitoringconfiguratie met Micrometer
@Configuration
public class BatchMetricsConfig {

    private final MeterRegistry meterRegistry;

    public BatchMetricsConfig(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }

    @Bean
    public JobExecutionListener metricsJobListener() {
        return new JobExecutionListener() {

            private Timer.Sample jobTimer;

            @Override
            public void beforeJob(JobExecution jobExecution) {
                // Start de duur-timer van de job
                jobTimer = Timer.start(meterRegistry);
                Counter.builder("batch.job.started")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .register(meterRegistry)
                        .increment();
            }

            @Override
            public void afterJob(JobExecution jobExecution) {
                // Registreert de totale duur
                jobTimer.stop(Timer.builder("batch.job.duration")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .tag("status", jobExecution.getStatus().toString())
                        .register(meterRegistry));

                // Job-counter per status
                Counter.builder("batch.job.completed")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .tag("status", jobExecution.getStatus().toString())
                        .register(meterRegistry)
                        .increment();
            }
        };
    }

    @Bean
    public StepExecutionListener metricsStepListener() {
        return new StepExecutionListener() {

            @Override
            public void afterStep(StepExecution stepExecution) {
                String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
                String stepName = stepExecution.getStepName();

                // Throughput-metrieken
                Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                return null;
            }
        };
    }
}

Vraag 12: Wat zijn veelvoorkomende valkuilen bij partitioning?

Veelvoorkomende fouten zijn: ongebalanceerde partities (één partitie bevat 90% van de data), niet thread-safe readers en onjuist statusbeheer tussen partities.

BalancedPartitioner.javajava
// Partitioner die de belasting daadwerkelijk balanceert
@Component
public class BalancedPartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // Telt totaal aantal te verwerken items
        Integer totalCount = jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);

        if (totalCount == null || totalCount == 0) {
            return Map.of();
        }

        // Berekent doelgrootte per partitie
        int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);

        Map<String, ExecutionContext> partitions = new HashMap<>();

        // Gebruikt OFFSET/LIMIT voor gebalanceerde partities
        // Duurder dan ranges maar garandeert balans
        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.putInt("offset", i * itemsPerPartition);
            context.putInt("limit", itemsPerPartition);
            context.putInt("partitionNumber", i);

            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

// OffsetBasedReader.java
// Reader compatibel met offset-gebaseerde partitioning
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {

    private final JdbcTemplate jdbcTemplate;
    private Iterator<OrderRecord> iterator;

    @Value("#{stepExecutionContext['offset']}")
    private int offset;

    @Value("#{stepExecutionContext['limit']}")
    private int limit;

    public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void open(ExecutionContext executionContext) {
        // Laadt exact het aan deze partitie toegewezen deel
        List<OrderRecord> records = jdbcTemplate.query(
                "SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
                new OrderRecordRowMapper(),
                limit, offset
        );
        this.iterator = records.iterator();
    }

    @Override
    public OrderRecord read() {
        return iterator.hasNext() ? iterator.next() : null;
    }

    @Override
    public void update(ExecutionContext executionContext) {
        // Statusopslag voor restart indien nodig
    }

    @Override
    public void close() {
        // Opruimen
    }
}

Geavanceerde vragen voor seniors

Vraag 13: Hoe beheer je afhankelijkheden tussen jobs?

Spring Batch beheert afhankelijkheden tussen jobs niet native. Oplossingen zijn: externe orchestrators (Airflow, Kubernetes CronJob) of een eigen implementatie met JobExplorer.

JobDependencyService.javajava
// Beheer van inter-job-afhankelijkheden
@Service
public class JobDependencyService {

    private final JobExplorer jobExplorer;
    private final JobLauncher jobLauncher;
    private final Map<String, Job> jobs;

    public JobDependencyService(JobExplorer jobExplorer,
                                  JobLauncher jobLauncher,
                                  Map<String, Job> jobs) {
        this.jobExplorer = jobExplorer;
        this.jobLauncher = jobLauncher;
        this.jobs = jobs;
    }

    public JobExecution runWithDependencies(String jobName,
                                             JobParameters params,
                                             List<String> dependsOn) throws Exception {
        // Verifieert dat alle afhankelijkheden zijn geslaagd
        for (String dependency : dependsOn) {
            if (!hasSuccessfulExecution(dependency, params)) {
                throw new JobExecutionException(
                        "Dependency not satisfied: " + dependency);
            }
        }

        Job job = jobs.get(jobName);
        if (job == null) {
            throw new IllegalArgumentException("Unknown job: " + jobName);
        }

        return jobLauncher.run(job, params);
    }

    private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
        // Zoekt een COMPLETED-uitvoering met dezelfde businessparameters
        return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
                .stream()
                .flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
                .filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
                .anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
    }

    private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
        // Vergelijkt businessparameters (negeert uitvoeringstimestamps)
        String actualDate = actual.getString("businessDate");
        String expectedDate = expected.getString("businessDate");
        return Objects.equals(actualDate, expectedDate);
    }
}

Vraag 14: Hoe test je een Spring Batch-job effectief?

Het testen van Spring Batch-jobs vereist een gelaagde aanpak: unittests voor componenten (reader, processor, writer), integratietests voor steps en end-to-end-tests voor complete jobs.

OrderProcessorTest.javajava
// Unittest van de processor
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {

    @Mock
    private PricingService pricingService;

    @Mock
    private ValidationService validationService;

    @InjectMocks
    private OrderItemProcessor processor;

    @Test
    void shouldProcessValidOrder() {
        // Given
        OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
        when(validationService.isValid(input)).thenReturn(true);
        when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));

        // When
        ProcessedOrder result = processor.process(input);

        // Then
        assertThat(result).isNotNull();
        assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
    }

    @Test
    void shouldFilterInvalidOrder() {
        // Given
        OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
        when(validationService.isValid(input)).thenReturn(false);

        // When
        ProcessedOrder result = processor.process(input);

        // Then - null betekent gefilterd
        assertThat(result).isNull();
        verify(pricingService, never()).calculatePrice(any());
    }
}
ImportJobIntegrationTest.javajava
// Integratietest van de complete job
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JobRepositoryTestUtils jobRepositoryTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @BeforeEach
    void setup() {
        // Schoont metadata tussen tests op
        jobRepositoryTestUtils.removeJobExecutions();
        // Reset testdata
        jdbcTemplate.execute("DELETE FROM processed_orders");
        jdbcTemplate.execute("DELETE FROM orders");
    }

    @Test
    void shouldCompleteJobSuccessfully() throws Exception {
        // Given - testdata
        insertTestOrders(100);

        // When
        JobParameters params = new JobParametersBuilder()
                .addLocalDate("businessDate", LocalDate.now())
                .addLong("run.id", System.currentTimeMillis())
                .toJobParameters();

        JobExecution execution = jobLauncherTestUtils.launchJob(params);

        // Then
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(countProcessedOrders()).isEqualTo(100);
    }

    @Test
    void shouldHandleEmptyDataset() throws Exception {
        // Given - geen data

        // When
        JobExecution execution = jobLauncherTestUtils.launchJob();

        // Then - job slaagt ook zonder data
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }

    @Test
    void shouldRestartFromFailurePoint() throws Exception {
        // Given - simuleert fout halverwege verwerking
        insertTestOrders(100);
        insertPoisonOrder(50);  // Veroorzaakt een fout

        // When - eerste uitvoering faalt
        JobExecution firstExecution = jobLauncherTestUtils.launchJob();
        assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);

        // Corrigeer de data
        removePoisonOrder(50);

        // When - restart
        JobExecution restartExecution = jobLauncherTestUtils.launchJob(
                firstExecution.getJobParameters());

        // Then - hervat vanaf faalpunt
        assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }

    private void insertTestOrders(int count) {
        for (int i = 1; i <= count; i++) {
            jdbcTemplate.update(
                    "INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
                    i, i * 10, BigDecimal.valueOf(i * 10));
        }
    }

    private int countProcessedOrders() {
        return jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM processed_orders", Integer.class);
    }
}

Vraag 15: Hoe optimaliseer je de schrijfperformance naar de database?

Schrijven wordt vaak het knelpunt. Optimalisaties zijn: JDBC batch inserts, constraints uitschakelen tijdens laden en gebruik van staging-tabellen.

OptimizedJdbcWriter.javajava
// Writer geoptimaliseerd voor grote volumes
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;
    private final DataSource dataSource;

    public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
        this.jdbcTemplate = jdbcTemplate;
        this.dataSource = dataSource;
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
        List<? extends ProcessedOrder> items = chunk.getItems();

        if (items.isEmpty()) {
            return;
        }

        // Gebruikt PreparedStatement met batch
        try (Connection connection = dataSource.getConnection();
             PreparedStatement ps = connection.prepareStatement(
                     "INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
                             "VALUES (?, ?, ?, ?)")) {

            for (ProcessedOrder order : items) {
                ps.setLong(1, order.orderId());
                ps.setLong(2, order.customerId());
                ps.setBigDecimal(3, order.finalPrice());
                ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                ps.addBatch();
            }

            // Voert alle inserts uit in één netwerkoperatie
            ps.executeBatch();
        }
    }
}

// StagingTableWriter.java
// Staging-table-patroon voor zeer grote volumes
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {

    private final JdbcTemplate jdbcTemplate;
    private String stagingTable;

    public StagingTableWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        // Maakt een tijdelijke tabel voor deze step
        stagingTable = "staging_orders_" + stepExecution.getId();
        jdbcTemplate.execute(
                "CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) {
        // Schrijft naar staging-tabel (zonder FK-constraints)
        String sql = "INSERT INTO " + stagingTable +
                " (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";

        jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
                (ps, order) -> {
                    ps.setLong(1, order.orderId());
                    ps.setLong(2, order.customerId());
                    ps.setBigDecimal(3, order.finalPrice());
                    ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                });
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
            // Bulk copy naar de uiteindelijke tabel
            jdbcTemplate.execute(
                    "INSERT INTO processed_orders SELECT * FROM " + stagingTable);
        }
        // Schoont staging-tabel op
        jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
        return stepExecution.getExitStatus();
    }
}

Conclusie

Beheersing van Spring Batch 5 in technische interviews leunt op diepgaand begrip van de interne mechanismen:

Architectuur: Job → Step → Chunk (Reader, Processor, Writer)

Chunk-verwerking: dimensionering, levenscyclus, transacties

Partitioning: lokaal vs remote, partitiebalans

Fault tolerance: skip, retry, restart met passende policy

Schaling: multi-threading, parallel steps, remote chunking

Tests: unit, integratie, end-to-end

Optimalisatie: batch writes, staging-tabellen, monitoring

Geavanceerde vragen toetsen het vermogen architectuurkeuzes te onderbouwen op basis van de context: datavolume, tijdsdruk, fouttolerantie en beschikbare infrastructuur.

Begin met oefenen!

Test je kennis met onze gespreksimulatoren en technische tests.

Tags

#spring batch
#spring boot
#java
#batch processing
#interview questions

Delen

Gerelateerde artikelen