Spring Batch 5 en entretien technique : partitioning, chunks et fault tolerance

Préparez vos entretiens Spring Batch 5 : 15 questions essentielles sur le partitioning, chunk-oriented processing, fault tolerance avec exemples de code Java 21.

Spring Batch 5 Interview : partitioning, chunks et fault tolerance

Spring Batch 5 représente un pilier du traitement de données massives dans l'écosystème Spring. Les entretiens techniques évaluent la capacité à concevoir des jobs robustes, scalables et tolérants aux pannes. Maîtriser le partitioning, le chunk-oriented processing et les mécanismes de fault tolerance distingue les développeurs seniors.

Point clé pour l'entretien

Les recruteurs testent la compréhension profonde : pourquoi choisir le partitioning plutôt que le remote chunking ? Comment dimensionner la taille des chunks ? Ces décisions architecturales révèlent l'expérience réelle en production.

Architecture fondamentale de Spring Batch 5

Question 1 : Quels sont les composants principaux de Spring Batch ?

L'architecture de Spring Batch repose sur trois couches distinctes : l'application (jobs et code métier), le Batch Core (classes runtime pour lancer et contrôler les jobs), et l'infrastructure (readers, writers et services communs comme RetryTemplate).

BatchJobConfig.javajava
// Configuration d'un job Spring Batch 5 avec Java 21
@Configuration
public class BatchJobConfig {

    // JobRepository stocke les métadonnées d'exécution
    // Permet le restart et le monitoring des jobs
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public BatchJobConfig(JobRepository jobRepository,
                          PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    // Un Job encapsule le processus batch complet
    // Composé d'un ou plusieurs Steps exécutés séquentiellement
    @Bean
    public Job importUserJob(Step processUsersStep, Step cleanupStep) {
        return new JobBuilder("importUserJob", jobRepository)
                .start(processUsersStep)      // Step principal de traitement
                .next(cleanupStep)             // Step de nettoyage
                .build();
    }

    // Un Step représente une unité de travail indépendante
    // Deux modèles : Tasklet (tâche unique) ou Chunk (traitement itératif)
    @Bean
    public Step processUsersStep(ItemReader<UserRecord> reader,
                                  ItemProcessor<UserRecord, User> processor,
                                  ItemWriter<User> writer) {
        return new StepBuilder("processUsersStep", jobRepository)
                .<UserRecord, User>chunk(100, transactionManager)  // Commit tous les 100 items
                .reader(reader)       // Lit les données source
                .processor(processor) // Transforme chaque item
                .writer(writer)       // Écrit par batch de 100
                .build();
    }
}

Le JobRepository persiste l'état des exécutions en base de données. Cette persistance permet de redémarrer un job échoué exactement là où il s'est arrêté, sans retraiter les données déjà validées.

Question 2 : Quelle est la différence entre Tasklet et Chunk-oriented processing ?

Tasklet exécute une action discrète et non répétitive : suppression de fichiers, appel de procédure stockée, envoi d'email de notification. Chunk traite des volumes massifs en découpant les données en lots gérables.

CleanupTasklet.javajava
// Tasklet : action unique sans itération
@Component
public class CleanupTasklet implements Tasklet {

    private final Path tempDirectory = Path.of("/tmp/batch-work");

    @Override
    public RepeatStatus execute(StepContribution contribution,
                                 ChunkContext chunkContext) throws Exception {
        // Supprime tous les fichiers temporaires du traitement
        try (var files = Files.walk(tempDirectory)) {
            files.filter(Files::isRegularFile)
                 .forEach(this::deleteQuietly);
        }

        // FINISHED indique que le tasklet a terminé son travail
        // CONTINUABLE relancerait l'exécution (utile pour polling)
        return RepeatStatus.FINISHED;
    }

    private void deleteQuietly(Path file) {
        try {
            Files.delete(file);
        } catch (IOException e) {
            // Log et continue - ne pas faire échouer le job pour un fichier
        }
    }
}
ChunkProcessingConfig.javajava
// Chunk-oriented : traitement de gros volumes
@Configuration
public class ChunkProcessingConfig {

    @Bean
    public Step processOrdersStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<OrderRecord> reader,
                                   ItemProcessor<OrderRecord, ProcessedOrder> processor,
                                   ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("processOrdersStep", jobRepository)
                // Chunk de 500 : lit 500 items, traite, écrit, puis commit
                .<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                // Listener pour monitorer la progression
                .listener(new ChunkProgressListener())
                .build();
    }
}

Le chunk-oriented processing offre des avantages critiques : gestion mémoire optimisée (seul le chunk courant est en mémoire), transactions granulaires (commit par chunk), et reprise après échec au niveau du dernier chunk validé.

Chunk-Oriented Processing en profondeur

Question 3 : Comment fonctionne le cycle de vie d'un chunk ?

Chaque chunk suit un cycle précis : lecture item par item jusqu'à atteindre la taille configurée, traitement individuel de chaque item, puis écriture groupée. Une transaction englobe l'ensemble du chunk.

OrderItemReader.javajava
// ItemReader : lit un item à la fois
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {

    // @StepScope : nouvelle instance par exécution de step
    // Permet d'injecter des paramètres de job dynamiques
    @Value("#{jobParameters['startDate']}")
    private LocalDate startDate;

    private Iterator<OrderRecord> orderIterator;

    @BeforeStep
    public void initializeReader(StepExecution stepExecution) {
        // Charge les données au démarrage du step
        List<OrderRecord> orders = fetchOrdersFromDate(startDate);
        this.orderIterator = orders.iterator();
    }

    @Override
    public OrderRecord read() {
        // Retourne null pour signaler la fin des données
        // Spring Batch appelle read() jusqu'à recevoir null
        if (orderIterator.hasNext()) {
            return orderIterator.next();
        }
        return null;  // Fin du dataset
    }

    private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
        // Récupération depuis la source de données
        return List.of();  // Implémentation réelle
    }
}
OrderItemProcessor.javajava
// ItemProcessor : transforme chaque item individuellement
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {

    private final PricingService pricingService;
    private final ValidationService validationService;

    public OrderItemProcessor(PricingService pricingService,
                               ValidationService validationService) {
        this.pricingService = pricingService;
        this.validationService = validationService;
    }

    @Override
    public ProcessedOrder process(OrderRecord item) {
        // Retourner null filtre l'item (ne sera pas écrit)
        if (!validationService.isValid(item)) {
            return null;  // Item filtré
        }

        // Transformation métier
        BigDecimal finalPrice = pricingService.calculatePrice(item);

        return new ProcessedOrder(
                item.orderId(),
                item.customerId(),
                finalPrice,
                LocalDateTime.now()
        );
    }
}
OrderItemWriter.javajava
// ItemWriter : écrit le chunk complet en une opération
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;

    public OrderItemWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) {
        // Le chunk contient tous les items traités
        // Écriture en batch pour optimiser les performances
        List<? extends ProcessedOrder> items = chunk.getItems();

        jdbcTemplate.batchUpdate(
                "INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
                items,
                items.size(),
                (ps, order) -> {
                    ps.setLong(1, order.orderId());
                    ps.setLong(2, order.customerId());
                    ps.setBigDecimal(3, order.finalPrice());
                    ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                }
        );
    }
}

Si une exception survient pendant le traitement d'un chunk, la transaction est rollback. Le job peut ensuite reprendre depuis ce chunk grâce aux métadonnées stockées dans le JobRepository.

Question 4 : Comment choisir la taille optimale d'un chunk ?

La taille du chunk impacte directement les performances et la consommation mémoire. Un chunk trop petit multiplie les commits (overhead). Un chunk trop grand consomme trop de mémoire et allonge les rollbacks en cas d'erreur.

ChunkSizingConfig.javajava
// Configuration dynamique de la taille de chunk
@Configuration
public class ChunkSizingConfig {

    // Taille par défaut raisonnable pour la plupart des cas
    private static final int DEFAULT_CHUNK_SIZE = 100;

    // Pour des items légers (quelques champs)
    private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;

    // Pour des items volumineux (blobs, documents)
    private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;

    @Bean
    public Step processLightDataStep(JobRepository jobRepository,
                                      PlatformTransactionManager txManager,
                                      ItemReader<LightRecord> reader,
                                      ItemWriter<LightRecord> writer) {
        return new StepBuilder("processLightDataStep", jobRepository)
                // Items légers : chunks plus grands pour moins de commits
                .<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
                .reader(reader)
                .writer(writer)
                .build();
    }

    @Bean
    public Step processDocumentsStep(JobRepository jobRepository,
                                      PlatformTransactionManager txManager,
                                      ItemReader<Document> reader,
                                      ItemProcessor<Document, ProcessedDocument> processor,
                                      ItemWriter<ProcessedDocument> writer) {
        return new StepBuilder("processDocumentsStep", jobRepository)
                // Documents volumineux : chunks petits pour limiter la mémoire
                .<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}
Règle empirique

Commencer avec 100 items par chunk, puis ajuster selon les métriques : temps de commit, utilisation mémoire, et durée de rollback. Monitorer avec des listeners pour identifier le sweet spot.

Partitioning pour le traitement parallèle

Question 5 : Qu'est-ce que le partitioning et quand l'utiliser ?

Le partitioning divise un dataset en partitions indépendantes traitées en parallèle. Chaque partition s'exécute dans son propre thread (local) ou sur un worker distant. Cette approche multiplie le throughput sans sacrifier la restartabilité.

PartitionedJobConfig.javajava
// Configuration d'un job partitionné
@Configuration
public class PartitionedJobConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public PartitionedJobConfig(JobRepository jobRepository,
                                 PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    @Bean
    public Job partitionedImportJob(Step partitionedStep) {
        return new JobBuilder("partitionedImportJob", jobRepository)
                .start(partitionedStep)
                .build();
    }

    // Step manager : orchestre les partitions
    @Bean
    public Step partitionedStep(Partitioner partitioner,
                                 Step workerStep,
                                 TaskExecutor taskExecutor) {
        return new StepBuilder("partitionedStep", jobRepository)
                // Divise le travail via le Partitioner
                .partitioner("workerStep", partitioner)
                // Step exécuté pour chaque partition
                .step(workerStep)
                // 8 threads parallèles
                .taskExecutor(taskExecutor)
                // Nombre de partitions à créer
                .gridSize(8)
                .build();
    }

    // TaskExecutor pour l'exécution parallèle
    @Bean
    public TaskExecutor batchTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(16);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("batch-partition-");
        executor.initialize();
        return executor;
    }
}
RangePartitioner.javajava
// Partitioner basé sur des plages d'IDs
@Component
public class RangePartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public RangePartitioner(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // Récupère les bornes du dataset
        Long minId = jdbcTemplate.queryForObject(
                "SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
        Long maxId = jdbcTemplate.queryForObject(
                "SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);

        if (minId == null || maxId == null) {
            return Map.of();  // Pas de données à traiter
        }

        // Calcule la taille de chaque partition
        long range = (maxId - minId) / gridSize + 1;
        Map<String, ExecutionContext> partitions = new HashMap<>();

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            long start = minId + (i * range);
            long end = Math.min(start + range - 1, maxId);

            // Chaque partition reçoit ses bornes
            context.putLong("minId", start);
            context.putLong("maxId", end);
            context.putInt("partitionNumber", i);

            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

Le partitioning convient aux datasets volumineux où les items sont indépendants. Les partitions doivent être équilibrées pour éviter qu'une partition lente ne ralentisse l'ensemble du job.

Question 6 : Quelle différence entre partitioning local et remote ?

Le partitioning local exécute toutes les partitions sur la même JVM avec un pool de threads. Le remote partitioning distribue les partitions sur plusieurs JVMs (workers) via un middleware de messaging.

RemotePartitioningConfig.javajava
// Configuration remote partitioning avec messaging
@Configuration
public class RemotePartitioningConfig {

    @Bean
    public Step managerStep(JobRepository jobRepository,
                             Partitioner partitioner,
                             MessageChannelPartitionHandler partitionHandler) {
        return new StepBuilder("managerStep", jobRepository)
                .partitioner("workerStep", partitioner)
                // Handler qui communique avec les workers distants
                .partitionHandler(partitionHandler)
                .build();
    }

    // PartitionHandler envoie les ExecutionContext aux workers
    @Bean
    public MessageChannelPartitionHandler partitionHandler(
            MessagingTemplate messagingTemplate,
            JobExplorer jobExplorer) {
        MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
        handler.setStepName("workerStep");
        handler.setGridSize(4);
        handler.setMessagingOperations(messagingTemplate);
        handler.setJobExplorer(jobExplorer);
        // Timeout pour attendre la fin des workers
        handler.setPollInterval(5000L);
        return handler;
    }
}
WorkerConfiguration.javajava
// Configuration côté worker
@Configuration
public class WorkerConfiguration {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public WorkerConfiguration(JobRepository jobRepository,
                                PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    // Le worker reçoit les partitions et exécute le step
    @Bean
    public Step workerStep(ItemReader<OrderRecord> reader,
                            ItemProcessor<OrderRecord, ProcessedOrder> processor,
                            ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("workerStep", jobRepository)
                .<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
                // Reader configuré en @StepScope pour recevoir les paramètres de partition
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    // Reader qui utilise les bornes de la partition
    @Bean
    @StepScope
    public JdbcCursorItemReader<OrderRecord> partitionedReader(
            DataSource dataSource,
            @Value("#{stepExecutionContext['minId']}") Long minId,
            @Value("#{stepExecutionContext['maxId']}") Long maxId) {
        return new JdbcCursorItemReaderBuilder<OrderRecord>()
                .name("partitionedOrderReader")
                .dataSource(dataSource)
                .sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
                .preparedStatementSetter(ps -> {
                    ps.setLong(1, minId);
                    ps.setLong(2, maxId);
                })
                .rowMapper(new OrderRecordRowMapper())
                .build();
    }
}

Prêt à réussir tes entretiens Spring Boot ?

Entraîne-toi avec nos simulateurs interactifs, fiches express et tests techniques.

Fault Tolerance et récupération d'erreurs

Question 7 : Quels mécanismes de fault tolerance offre Spring Batch ?

Spring Batch propose trois mécanismes complémentaires : skip (ignorer les items en erreur), retry (réessayer automatiquement), et restart (reprendre un job échoué). Ces mécanismes se configurent au niveau du step.

FaultTolerantStepConfig.javajava
// Configuration complète de fault tolerance
@Configuration
public class FaultTolerantStepConfig {

    @Bean
    public Step faultTolerantStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<DataRecord> reader,
                                   ItemProcessor<DataRecord, ProcessedRecord> processor,
                                   ItemWriter<ProcessedRecord> writer,
                                   SkipPolicy customSkipPolicy) {
        return new StepBuilder("faultTolerantStep", jobRepository)
                .<DataRecord, ProcessedRecord>chunk(100, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                // Active le mode fault tolerant
                .faultTolerant()
                // SKIP : ignore jusqu'à 10 erreurs de validation
                .skipLimit(10)
                .skip(ValidationException.class)
                .skip(DataIntegrityViolationException.class)
                // Certaines erreurs ne doivent jamais être skippées
                .noSkip(FatalBatchException.class)
                // RETRY : réessaie les erreurs transitoires
                .retryLimit(3)
                .retry(TransientDataAccessException.class)
                .retry(DeadlockLoserDataAccessException.class)
                // Backoff exponentiel entre les retries
                .backOffPolicy(exponentialBackOffPolicy())
                // Listener pour logger les skips
                .listener(skipListener())
                .build();
    }

    @Bean
    public BackOffPolicy exponentialBackOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(1000);  // 1 seconde
        policy.setMultiplier(2.0);         // Double à chaque retry
        policy.setMaxInterval(10000);      // Max 10 secondes
        return policy;
    }

    @Bean
    public SkipListener<DataRecord, ProcessedRecord> skipListener() {
        return new SkipListener<>() {
            @Override
            public void onSkipInRead(Throwable t) {
                // Log item non lisible
            }

            @Override
            public void onSkipInProcess(DataRecord item, Throwable t) {
                // Log item qui a échoué au processing
            }

            @Override
            public void onSkipInWrite(ProcessedRecord item, Throwable t) {
                // Log item qui a échoué à l'écriture
            }
        };
    }
}

Le retry convient aux erreurs transitoires (timeout réseau, deadlock base de données). Le skip convient aux erreurs de données individuelles qui ne doivent pas bloquer le traitement global.

Question 8 : Comment implémenter une SkipPolicy personnalisée ?

Une SkipPolicy personnalisée permet une logique de décision fine : skipper selon le type d'exception, le nombre d'erreurs, ou des critères métier spécifiques.

AdaptiveSkipPolicy.javajava
// SkipPolicy avec logique métier avancée
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {

    private static final int MAX_SKIP_COUNT = 100;
    private static final double MAX_SKIP_PERCENTAGE = 0.05;  // 5% max

    private final AtomicInteger totalProcessed = new AtomicInteger(0);
    private final AtomicInteger skipCount = new AtomicInteger(0);

    @Override
    public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
        // Jamais skipper les erreurs fatales
        if (exception instanceof FatalBatchException
                || exception instanceof OutOfMemoryError) {
            return false;
        }

        // Limite absolue de skips
        if (skipCountSoFar >= MAX_SKIP_COUNT) {
            return false;  // Arrête le job
        }

        // Limite en pourcentage
        int total = totalProcessed.get();
        if (total > 1000) {  // Applique seulement après warmup
            double skipPercentage = (double) skipCountSoFar / total;
            if (skipPercentage > MAX_SKIP_PERCENTAGE) {
                return false;  // Trop d'erreurs proportionnellement
            }
        }

        // Skipper les erreurs de validation et données
        return exception instanceof ValidationException
                || exception instanceof DataFormatException
                || exception instanceof IllegalArgumentException;
    }

    // Appelé par un listener pour tracker la progression
    public void incrementProcessed() {
        totalProcessed.incrementAndGet();
    }
}

Question 9 : Comment fonctionne le restart d'un job échoué ?

Le JobRepository stocke l'état de chaque exécution. Lors d'un restart, Spring Batch identifie le dernier chunk committé et reprend depuis ce point. Les items déjà traités avec succès ne sont pas retraités.

JobRestartService.javajava
// Service de gestion des restarts
@Service
public class JobRestartService {

    private final JobLauncher jobLauncher;
    private final JobExplorer jobExplorer;
    private final JobRepository jobRepository;
    private final Job importJob;

    public JobRestartService(JobLauncher jobLauncher,
                              JobExplorer jobExplorer,
                              JobRepository jobRepository,
                              @Qualifier("importJob") Job importJob) {
        this.jobLauncher = jobLauncher;
        this.jobExplorer = jobExplorer;
        this.jobRepository = jobRepository;
        this.importJob = importJob;
    }

    public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
        // Récupère l'exécution échouée
        JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);

        if (failedExecution == null) {
            throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
        }

        // Vérifie que le job peut être redémarré
        if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
            throw new IllegalStateException("Only FAILED jobs can be restarted");
        }

        // Utilise les mêmes paramètres que l'exécution originale
        JobParameters originalParams = failedExecution.getJobParameters();

        // Relance le job - reprend automatiquement depuis le dernier checkpoint
        return jobLauncher.run(importJob, originalParams);
    }

    public List<JobExecution> findRestartableJobs() {
        // Liste toutes les exécutions FAILED non encore restartées
        return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
                .stream()
                .flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
                .filter(exec -> exec.getStatus() == BatchStatus.FAILED)
                .filter(this::isRestartable)
                .toList();
    }

    private boolean isRestartable(JobExecution execution) {
        // Vérifie qu'il n'y a pas d'exécution plus récente réussie
        JobInstance instance = execution.getJobInstance();
        return jobExplorer.getJobExecutions(instance).stream()
                .noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
    }
}
Piège d'entretien

Un job ne peut être restarté que si les JobParameters sont identiques. Modifier un paramètre crée une nouvelle instance de job, perdant ainsi l'historique de progression.

Scaling et optimisation

Question 10 : Quelles sont les stratégies de scaling disponibles ?

Spring Batch propose quatre stratégies : multi-threaded step (plusieurs threads lisent en parallèle), parallel steps (steps indépendants en parallèle), remote chunking (processing distribué), et partitioning (données distribuées).

MultiThreadedStepConfig.javajava
// Step multi-threadé : plusieurs threads traitent le même dataset
@Configuration
public class MultiThreadedStepConfig {

    @Bean
    public Step multiThreadedStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<Record> reader,
                                   ItemProcessor<Record, ProcessedRecord> processor,
                                   ItemWriter<ProcessedRecord> writer,
                                   TaskExecutor taskExecutor) {
        return new StepBuilder("multiThreadedStep", jobRepository)
                .<Record, ProcessedRecord>chunk(100, transactionManager)
                // ATTENTION : le reader doit être thread-safe
                .reader(synchronizedReader(reader))
                .processor(processor)
                .writer(writer)
                // 4 threads traitent les chunks en parallèle
                .taskExecutor(taskExecutor)
                .throttleLimit(4)
                .build();
    }

    // Wrapper pour rendre le reader thread-safe
    private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
        SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
        syncReader.setDelegate((ItemStreamReader<Record>) reader);
        return syncReader;
    }
}
ParallelStepsConfig.javajava
// Exécution de steps indépendants en parallèle
@Configuration
public class ParallelStepsConfig {

    @Bean
    public Job parallelJob(JobRepository jobRepository,
                            Step loadCustomersStep,
                            Step loadProductsStep,
                            Step loadOrdersStep,
                            Step processDataStep) {
        // Flow parallèle : customers et products chargés simultanément
        Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
                .start(loadCustomersStep)
                .build();

        Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
                .start(loadProductsStep)
                .build();

        Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
                .start(loadOrdersStep)
                .build();

        // Split exécute les flows en parallèle
        return new JobBuilder("parallelJob", jobRepository)
                .start(new FlowBuilder<Flow>("parallelLoadFlow")
                        .split(new SimpleAsyncTaskExecutor())
                        .add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
                        .build())
                // Après le chargement parallèle, traitement séquentiel
                .next(processDataStep)
                .build()
                .build();
    }
}

Le multi-threading convient quand le reader peut être synchronisé. Le partitioning est préférable pour les gros volumes car chaque partition a son propre reader sans contention.

Question 11 : Comment monitorer les performances d'un job ?

Spring Batch expose des métriques via les listeners et le JobRepository. L'intégration avec Micrometer permet l'export vers Prometheus, Grafana ou d'autres systèmes de monitoring.

BatchMetricsConfig.javajava
// Configuration du monitoring avec Micrometer
@Configuration
public class BatchMetricsConfig {

    private final MeterRegistry meterRegistry;

    public BatchMetricsConfig(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }

    @Bean
    public JobExecutionListener metricsJobListener() {
        return new JobExecutionListener() {

            private Timer.Sample jobTimer;

            @Override
            public void beforeJob(JobExecution jobExecution) {
                // Démarre le timer de durée du job
                jobTimer = Timer.start(meterRegistry);
                Counter.builder("batch.job.started")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .register(meterRegistry)
                        .increment();
            }

            @Override
            public void afterJob(JobExecution jobExecution) {
                // Enregistre la durée totale
                jobTimer.stop(Timer.builder("batch.job.duration")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .tag("status", jobExecution.getStatus().toString())
                        .register(meterRegistry));

                // Compteur de jobs par statut
                Counter.builder("batch.job.completed")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .tag("status", jobExecution.getStatus().toString())
                        .register(meterRegistry)
                        .increment();
            }
        };
    }

    @Bean
    public StepExecutionListener metricsStepListener() {
        return new StepExecutionListener() {

            @Override
            public void afterStep(StepExecution stepExecution) {
                String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
                String stepName = stepExecution.getStepName();

                // Métriques de throughput
                Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                return null;
            }
        };
    }
}

Question 12 : Quels sont les pièges courants avec le partitioning ?

Les erreurs fréquentes incluent : partitions déséquilibrées (une partition contient 90% des données), readers non thread-safe, et gestion incorrecte de l'état entre partitions.

BalancedPartitioner.javajava
// Partitioner qui équilibre réellement la charge
@Component
public class BalancedPartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // Compte le nombre total d'items à traiter
        Integer totalCount = jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);

        if (totalCount == null || totalCount == 0) {
            return Map.of();
        }

        // Calcule la taille cible par partition
        int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);

        Map<String, ExecutionContext> partitions = new HashMap<>();

        // Utilise OFFSET/LIMIT pour des partitions équilibrées
        // Plus coûteux que les ranges mais garantit l'équilibre
        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.putInt("offset", i * itemsPerPartition);
            context.putInt("limit", itemsPerPartition);
            context.putInt("partitionNumber", i);

            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

// OffsetBasedReader.java
// Reader compatible avec le partitioning par offset
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {

    private final JdbcTemplate jdbcTemplate;
    private Iterator<OrderRecord> iterator;

    @Value("#{stepExecutionContext['offset']}")
    private int offset;

    @Value("#{stepExecutionContext['limit']}")
    private int limit;

    public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void open(ExecutionContext executionContext) {
        // Charge exactement la portion assignée à cette partition
        List<OrderRecord> records = jdbcTemplate.query(
                "SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
                new OrderRecordRowMapper(),
                limit, offset
        );
        this.iterator = records.iterator();
    }

    @Override
    public OrderRecord read() {
        return iterator.hasNext() ? iterator.next() : null;
    }

    @Override
    public void update(ExecutionContext executionContext) {
        // Sauvegarde de l'état pour restart si nécessaire
    }

    @Override
    public void close() {
        // Cleanup
    }
}

Questions avancées pour seniors

Question 13 : Comment gérer les dépendances entre jobs ?

Spring Batch ne gère pas nativement les dépendances inter-jobs. Les solutions incluent : orchestrateurs externes (Airflow, Kubernetes CronJob), ou implémentation custom avec JobExplorer.

JobDependencyService.javajava
// Gestion des dépendances entre jobs
@Service
public class JobDependencyService {

    private final JobExplorer jobExplorer;
    private final JobLauncher jobLauncher;
    private final Map<String, Job> jobs;

    public JobDependencyService(JobExplorer jobExplorer,
                                  JobLauncher jobLauncher,
                                  Map<String, Job> jobs) {
        this.jobExplorer = jobExplorer;
        this.jobLauncher = jobLauncher;
        this.jobs = jobs;
    }

    public JobExecution runWithDependencies(String jobName,
                                             JobParameters params,
                                             List<String> dependsOn) throws Exception {
        // Vérifie que toutes les dépendances ont réussi
        for (String dependency : dependsOn) {
            if (!hasSuccessfulExecution(dependency, params)) {
                throw new JobExecutionException(
                        "Dependency not satisfied: " + dependency);
            }
        }

        Job job = jobs.get(jobName);
        if (job == null) {
            throw new IllegalArgumentException("Unknown job: " + jobName);
        }

        return jobLauncher.run(job, params);
    }

    private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
        // Cherche une exécution COMPLETED avec les mêmes paramètres métier
        return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
                .stream()
                .flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
                .filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
                .anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
    }

    private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
        // Compare les paramètres métier (ignore les timestamps d'exécution)
        String actualDate = actual.getString("businessDate");
        String expectedDate = expected.getString("businessDate");
        return Objects.equals(actualDate, expectedDate);
    }
}

Question 14 : Comment tester efficacement un job Spring Batch ?

Les tests de jobs Spring Batch requièrent une approche en couches : tests unitaires des composants (reader, processor, writer), tests d'intégration des steps, et tests end-to-end du job complet.

OrderProcessorTest.javajava
// Test unitaire du processor
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {

    @Mock
    private PricingService pricingService;

    @Mock
    private ValidationService validationService;

    @InjectMocks
    private OrderItemProcessor processor;

    @Test
    void shouldProcessValidOrder() {
        // Given
        OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
        when(validationService.isValid(input)).thenReturn(true);
        when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));

        // When
        ProcessedOrder result = processor.process(input);

        // Then
        assertThat(result).isNotNull();
        assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
    }

    @Test
    void shouldFilterInvalidOrder() {
        // Given
        OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
        when(validationService.isValid(input)).thenReturn(false);

        // When
        ProcessedOrder result = processor.process(input);

        // Then - null signifie filtré
        assertThat(result).isNull();
        verify(pricingService, never()).calculatePrice(any());
    }
}
ImportJobIntegrationTest.javajava
// Test d'intégration du job complet
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JobRepositoryTestUtils jobRepositoryTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @BeforeEach
    void setup() {
        // Nettoie les métadonnées entre les tests
        jobRepositoryTestUtils.removeJobExecutions();
        // Réinitialise les données de test
        jdbcTemplate.execute("DELETE FROM processed_orders");
        jdbcTemplate.execute("DELETE FROM orders");
    }

    @Test
    void shouldCompleteJobSuccessfully() throws Exception {
        // Given - données de test
        insertTestOrders(100);

        // When
        JobParameters params = new JobParametersBuilder()
                .addLocalDate("businessDate", LocalDate.now())
                .addLong("run.id", System.currentTimeMillis())
                .toJobParameters();

        JobExecution execution = jobLauncherTestUtils.launchJob(params);

        // Then
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(countProcessedOrders()).isEqualTo(100);
    }

    @Test
    void shouldHandleEmptyDataset() throws Exception {
        // Given - pas de données

        // When
        JobExecution execution = jobLauncherTestUtils.launchJob();

        // Then - le job réussit même sans données
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }

    @Test
    void shouldRestartFromFailurePoint() throws Exception {
        // Given - simule une erreur à mi-chemin
        insertTestOrders(100);
        insertPoisonOrder(50);  // Provoque une erreur

        // When - première exécution échoue
        JobExecution firstExecution = jobLauncherTestUtils.launchJob();
        assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);

        // Corrige les données
        removePoisonOrder(50);

        // When - restart
        JobExecution restartExecution = jobLauncherTestUtils.launchJob(
                firstExecution.getJobParameters());

        // Then - reprend depuis le point d'échec
        assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }

    private void insertTestOrders(int count) {
        for (int i = 1; i <= count; i++) {
            jdbcTemplate.update(
                    "INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
                    i, i * 10, BigDecimal.valueOf(i * 10));
        }
    }

    private int countProcessedOrders() {
        return jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM processed_orders", Integer.class);
    }
}

Question 15 : Comment optimiser les performances d'écriture en base ?

L'écriture représente souvent le goulot d'étranglement. Les optimisations incluent : batch inserts JDBC, désactivation des contraintes pendant le chargement, et utilisation de tables temporaires.

OptimizedJdbcWriter.javajava
// Writer optimisé pour les gros volumes
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;
    private final DataSource dataSource;

    public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
        this.jdbcTemplate = jdbcTemplate;
        this.dataSource = dataSource;
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
        List<? extends ProcessedOrder> items = chunk.getItems();

        if (items.isEmpty()) {
            return;
        }

        // Utilise un PreparedStatement avec batch
        try (Connection connection = dataSource.getConnection();
             PreparedStatement ps = connection.prepareStatement(
                     "INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
                             "VALUES (?, ?, ?, ?)")) {

            for (ProcessedOrder order : items) {
                ps.setLong(1, order.orderId());
                ps.setLong(2, order.customerId());
                ps.setBigDecimal(3, order.finalPrice());
                ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                ps.addBatch();
            }

            // Exécute tous les inserts en une seule opération réseau
            ps.executeBatch();
        }
    }
}

// StagingTableWriter.java
// Pattern staging table pour les très gros volumes
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {

    private final JdbcTemplate jdbcTemplate;
    private String stagingTable;

    public StagingTableWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        // Crée une table temporaire pour ce step
        stagingTable = "staging_orders_" + stepExecution.getId();
        jdbcTemplate.execute(
                "CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) {
        // Écrit dans la staging table (sans contraintes FK)
        String sql = "INSERT INTO " + stagingTable +
                " (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";

        jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
                (ps, order) -> {
                    ps.setLong(1, order.orderId());
                    ps.setLong(2, order.customerId());
                    ps.setBigDecimal(3, order.finalPrice());
                    ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                });
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
            // Copie en masse vers la table finale
            jdbcTemplate.execute(
                    "INSERT INTO processed_orders SELECT * FROM " + stagingTable);
        }
        // Nettoie la staging table
        jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
        return stepExecution.getExitStatus();
    }
}

Conclusion

La maîtrise de Spring Batch 5 en entretien technique repose sur la compréhension profonde des mécanismes internes :

Architecture : Job → Step → Chunk (Reader, Processor, Writer)

Chunk processing : dimensionnement, cycle de vie, transactions

Partitioning : local vs remote, équilibrage des partitions

Fault tolerance : skip, retry, restart avec politique adaptée

Scaling : multi-threading, parallel steps, remote chunking

Testing : unitaire, intégration, end-to-end

Optimisation : batch writes, staging tables, monitoring

Les questions avancées testent la capacité à justifier des choix architecturaux selon le contexte : volume de données, contraintes de temps, tolérance aux erreurs, et infrastructure disponible.

Passe à la pratique !

Teste tes connaissances avec nos simulateurs d'entretien et tests techniques.

Tags

#spring batch
#spring boot
#java
#batch processing
#entretien technique

Partager

Articles similaires