Spring Batch 5 en entretien technique : partitioning, chunks et fault tolerance
Préparez vos entretiens Spring Batch 5 : 15 questions essentielles sur le partitioning, chunk-oriented processing, fault tolerance avec exemples de code Java 21.

Spring Batch 5 représente un pilier du traitement de données massives dans l'écosystème Spring. Les entretiens techniques évaluent la capacité à concevoir des jobs robustes, scalables et tolérants aux pannes. Maîtriser le partitioning, le chunk-oriented processing et les mécanismes de fault tolerance distingue les développeurs seniors.
Les recruteurs testent la compréhension profonde : pourquoi choisir le partitioning plutôt que le remote chunking ? Comment dimensionner la taille des chunks ? Ces décisions architecturales révèlent l'expérience réelle en production.
Architecture fondamentale de Spring Batch 5
Question 1 : Quels sont les composants principaux de Spring Batch ?
L'architecture de Spring Batch repose sur trois couches distinctes : l'application (jobs et code métier), le Batch Core (classes runtime pour lancer et contrôler les jobs), et l'infrastructure (readers, writers et services communs comme RetryTemplate).
// Configuration d'un job Spring Batch 5 avec Java 21
@Configuration
public class BatchJobConfig {
// JobRepository stocke les métadonnées d'exécution
// Permet le restart et le monitoring des jobs
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public BatchJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Un Job encapsule le processus batch complet
// Composé d'un ou plusieurs Steps exécutés séquentiellement
@Bean
public Job importUserJob(Step processUsersStep, Step cleanupStep) {
return new JobBuilder("importUserJob", jobRepository)
.start(processUsersStep) // Step principal de traitement
.next(cleanupStep) // Step de nettoyage
.build();
}
// Un Step représente une unité de travail indépendante
// Deux modèles : Tasklet (tâche unique) ou Chunk (traitement itératif)
@Bean
public Step processUsersStep(ItemReader<UserRecord> reader,
ItemProcessor<UserRecord, User> processor,
ItemWriter<User> writer) {
return new StepBuilder("processUsersStep", jobRepository)
.<UserRecord, User>chunk(100, transactionManager) // Commit tous les 100 items
.reader(reader) // Lit les données source
.processor(processor) // Transforme chaque item
.writer(writer) // Écrit par batch de 100
.build();
}
}Le JobRepository persiste l'état des exécutions en base de données. Cette persistance permet de redémarrer un job échoué exactement là où il s'est arrêté, sans retraiter les données déjà validées.
Question 2 : Quelle est la différence entre Tasklet et Chunk-oriented processing ?
Tasklet exécute une action discrète et non répétitive : suppression de fichiers, appel de procédure stockée, envoi d'email de notification. Chunk traite des volumes massifs en découpant les données en lots gérables.
// Tasklet : action unique sans itération
@Component
public class CleanupTasklet implements Tasklet {
private final Path tempDirectory = Path.of("/tmp/batch-work");
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// Supprime tous les fichiers temporaires du traitement
try (var files = Files.walk(tempDirectory)) {
files.filter(Files::isRegularFile)
.forEach(this::deleteQuietly);
}
// FINISHED indique que le tasklet a terminé son travail
// CONTINUABLE relancerait l'exécution (utile pour polling)
return RepeatStatus.FINISHED;
}
private void deleteQuietly(Path file) {
try {
Files.delete(file);
} catch (IOException e) {
// Log et continue - ne pas faire échouer le job pour un fichier
}
}
}// Chunk-oriented : traitement de gros volumes
@Configuration
public class ChunkProcessingConfig {
@Bean
public Step processOrdersStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("processOrdersStep", jobRepository)
// Chunk de 500 : lit 500 items, traite, écrit, puis commit
.<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Listener pour monitorer la progression
.listener(new ChunkProgressListener())
.build();
}
}Le chunk-oriented processing offre des avantages critiques : gestion mémoire optimisée (seul le chunk courant est en mémoire), transactions granulaires (commit par chunk), et reprise après échec au niveau du dernier chunk validé.
Chunk-Oriented Processing en profondeur
Question 3 : Comment fonctionne le cycle de vie d'un chunk ?
Chaque chunk suit un cycle précis : lecture item par item jusqu'à atteindre la taille configurée, traitement individuel de chaque item, puis écriture groupée. Une transaction englobe l'ensemble du chunk.
// ItemReader : lit un item à la fois
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {
// @StepScope : nouvelle instance par exécution de step
// Permet d'injecter des paramètres de job dynamiques
@Value("#{jobParameters['startDate']}")
private LocalDate startDate;
private Iterator<OrderRecord> orderIterator;
@BeforeStep
public void initializeReader(StepExecution stepExecution) {
// Charge les données au démarrage du step
List<OrderRecord> orders = fetchOrdersFromDate(startDate);
this.orderIterator = orders.iterator();
}
@Override
public OrderRecord read() {
// Retourne null pour signaler la fin des données
// Spring Batch appelle read() jusqu'à recevoir null
if (orderIterator.hasNext()) {
return orderIterator.next();
}
return null; // Fin du dataset
}
private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
// Récupération depuis la source de données
return List.of(); // Implémentation réelle
}
}// ItemProcessor : transforme chaque item individuellement
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {
private final PricingService pricingService;
private final ValidationService validationService;
public OrderItemProcessor(PricingService pricingService,
ValidationService validationService) {
this.pricingService = pricingService;
this.validationService = validationService;
}
@Override
public ProcessedOrder process(OrderRecord item) {
// Retourner null filtre l'item (ne sera pas écrit)
if (!validationService.isValid(item)) {
return null; // Item filtré
}
// Transformation métier
BigDecimal finalPrice = pricingService.calculatePrice(item);
return new ProcessedOrder(
item.orderId(),
item.customerId(),
finalPrice,
LocalDateTime.now()
);
}
}// ItemWriter : écrit le chunk complet en une opération
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
public OrderItemWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// Le chunk contient tous les items traités
// Écriture en batch pour optimiser les performances
List<? extends ProcessedOrder> items = chunk.getItems();
jdbcTemplate.batchUpdate(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
items,
items.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
}
);
}
}Si une exception survient pendant le traitement d'un chunk, la transaction est rollback. Le job peut ensuite reprendre depuis ce chunk grâce aux métadonnées stockées dans le JobRepository.
Question 4 : Comment choisir la taille optimale d'un chunk ?
La taille du chunk impacte directement les performances et la consommation mémoire. Un chunk trop petit multiplie les commits (overhead). Un chunk trop grand consomme trop de mémoire et allonge les rollbacks en cas d'erreur.
// Configuration dynamique de la taille de chunk
@Configuration
public class ChunkSizingConfig {
// Taille par défaut raisonnable pour la plupart des cas
private static final int DEFAULT_CHUNK_SIZE = 100;
// Pour des items légers (quelques champs)
private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;
// Pour des items volumineux (blobs, documents)
private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;
@Bean
public Step processLightDataStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<LightRecord> reader,
ItemWriter<LightRecord> writer) {
return new StepBuilder("processLightDataStep", jobRepository)
// Items légers : chunks plus grands pour moins de commits
.<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Step processDocumentsStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<Document> reader,
ItemProcessor<Document, ProcessedDocument> processor,
ItemWriter<ProcessedDocument> writer) {
return new StepBuilder("processDocumentsStep", jobRepository)
// Documents volumineux : chunks petits pour limiter la mémoire
.<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}Commencer avec 100 items par chunk, puis ajuster selon les métriques : temps de commit, utilisation mémoire, et durée de rollback. Monitorer avec des listeners pour identifier le sweet spot.
Partitioning pour le traitement parallèle
Question 5 : Qu'est-ce que le partitioning et quand l'utiliser ?
Le partitioning divise un dataset en partitions indépendantes traitées en parallèle. Chaque partition s'exécute dans son propre thread (local) ou sur un worker distant. Cette approche multiplie le throughput sans sacrifier la restartabilité.
// Configuration d'un job partitionné
@Configuration
public class PartitionedJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public PartitionedJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
@Bean
public Job partitionedImportJob(Step partitionedStep) {
return new JobBuilder("partitionedImportJob", jobRepository)
.start(partitionedStep)
.build();
}
// Step manager : orchestre les partitions
@Bean
public Step partitionedStep(Partitioner partitioner,
Step workerStep,
TaskExecutor taskExecutor) {
return new StepBuilder("partitionedStep", jobRepository)
// Divise le travail via le Partitioner
.partitioner("workerStep", partitioner)
// Step exécuté pour chaque partition
.step(workerStep)
// 8 threads parallèles
.taskExecutor(taskExecutor)
// Nombre de partitions à créer
.gridSize(8)
.build();
}
// TaskExecutor pour l'exécution parallèle
@Bean
public TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("batch-partition-");
executor.initialize();
return executor;
}
}// Partitioner basé sur des plages d'IDs
@Component
public class RangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public RangePartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Récupère les bornes du dataset
Long minId = jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
Long maxId = jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);
if (minId == null || maxId == null) {
return Map.of(); // Pas de données à traiter
}
// Calcule la taille de chaque partition
long range = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
long start = minId + (i * range);
long end = Math.min(start + range - 1, maxId);
// Chaque partition reçoit ses bornes
context.putLong("minId", start);
context.putLong("maxId", end);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}Le partitioning convient aux datasets volumineux où les items sont indépendants. Les partitions doivent être équilibrées pour éviter qu'une partition lente ne ralentisse l'ensemble du job.
Question 6 : Quelle différence entre partitioning local et remote ?
Le partitioning local exécute toutes les partitions sur la même JVM avec un pool de threads. Le remote partitioning distribue les partitions sur plusieurs JVMs (workers) via un middleware de messaging.
// Configuration remote partitioning avec messaging
@Configuration
public class RemotePartitioningConfig {
@Bean
public Step managerStep(JobRepository jobRepository,
Partitioner partitioner,
MessageChannelPartitionHandler partitionHandler) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", partitioner)
// Handler qui communique avec les workers distants
.partitionHandler(partitionHandler)
.build();
}
// PartitionHandler envoie les ExecutionContext aux workers
@Bean
public MessageChannelPartitionHandler partitionHandler(
MessagingTemplate messagingTemplate,
JobExplorer jobExplorer) {
MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
handler.setStepName("workerStep");
handler.setGridSize(4);
handler.setMessagingOperations(messagingTemplate);
handler.setJobExplorer(jobExplorer);
// Timeout pour attendre la fin des workers
handler.setPollInterval(5000L);
return handler;
}
}// Configuration côté worker
@Configuration
public class WorkerConfiguration {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public WorkerConfiguration(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Le worker reçoit les partitions et exécute le step
@Bean
public Step workerStep(ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("workerStep", jobRepository)
.<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
// Reader configuré en @StepScope pour recevoir les paramètres de partition
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
// Reader qui utilise les bornes de la partition
@Bean
@StepScope
public JdbcCursorItemReader<OrderRecord> partitionedReader(
DataSource dataSource,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
return new JdbcCursorItemReaderBuilder<OrderRecord>()
.name("partitionedOrderReader")
.dataSource(dataSource)
.sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
.preparedStatementSetter(ps -> {
ps.setLong(1, minId);
ps.setLong(2, maxId);
})
.rowMapper(new OrderRecordRowMapper())
.build();
}
}Prêt à réussir tes entretiens Spring Boot ?
Entraîne-toi avec nos simulateurs interactifs, fiches express et tests techniques.
Fault Tolerance et récupération d'erreurs
Question 7 : Quels mécanismes de fault tolerance offre Spring Batch ?
Spring Batch propose trois mécanismes complémentaires : skip (ignorer les items en erreur), retry (réessayer automatiquement), et restart (reprendre un job échoué). Ces mécanismes se configurent au niveau du step.
// Configuration complète de fault tolerance
@Configuration
public class FaultTolerantStepConfig {
@Bean
public Step faultTolerantStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<DataRecord> reader,
ItemProcessor<DataRecord, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
SkipPolicy customSkipPolicy) {
return new StepBuilder("faultTolerantStep", jobRepository)
.<DataRecord, ProcessedRecord>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Active le mode fault tolerant
.faultTolerant()
// SKIP : ignore jusqu'à 10 erreurs de validation
.skipLimit(10)
.skip(ValidationException.class)
.skip(DataIntegrityViolationException.class)
// Certaines erreurs ne doivent jamais être skippées
.noSkip(FatalBatchException.class)
// RETRY : réessaie les erreurs transitoires
.retryLimit(3)
.retry(TransientDataAccessException.class)
.retry(DeadlockLoserDataAccessException.class)
// Backoff exponentiel entre les retries
.backOffPolicy(exponentialBackOffPolicy())
// Listener pour logger les skips
.listener(skipListener())
.build();
}
@Bean
public BackOffPolicy exponentialBackOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(1000); // 1 seconde
policy.setMultiplier(2.0); // Double à chaque retry
policy.setMaxInterval(10000); // Max 10 secondes
return policy;
}
@Bean
public SkipListener<DataRecord, ProcessedRecord> skipListener() {
return new SkipListener<>() {
@Override
public void onSkipInRead(Throwable t) {
// Log item non lisible
}
@Override
public void onSkipInProcess(DataRecord item, Throwable t) {
// Log item qui a échoué au processing
}
@Override
public void onSkipInWrite(ProcessedRecord item, Throwable t) {
// Log item qui a échoué à l'écriture
}
};
}
}Le retry convient aux erreurs transitoires (timeout réseau, deadlock base de données). Le skip convient aux erreurs de données individuelles qui ne doivent pas bloquer le traitement global.
Question 8 : Comment implémenter une SkipPolicy personnalisée ?
Une SkipPolicy personnalisée permet une logique de décision fine : skipper selon le type d'exception, le nombre d'erreurs, ou des critères métier spécifiques.
// SkipPolicy avec logique métier avancée
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {
private static final int MAX_SKIP_COUNT = 100;
private static final double MAX_SKIP_PERCENTAGE = 0.05; // 5% max
private final AtomicInteger totalProcessed = new AtomicInteger(0);
private final AtomicInteger skipCount = new AtomicInteger(0);
@Override
public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
// Jamais skipper les erreurs fatales
if (exception instanceof FatalBatchException
|| exception instanceof OutOfMemoryError) {
return false;
}
// Limite absolue de skips
if (skipCountSoFar >= MAX_SKIP_COUNT) {
return false; // Arrête le job
}
// Limite en pourcentage
int total = totalProcessed.get();
if (total > 1000) { // Applique seulement après warmup
double skipPercentage = (double) skipCountSoFar / total;
if (skipPercentage > MAX_SKIP_PERCENTAGE) {
return false; // Trop d'erreurs proportionnellement
}
}
// Skipper les erreurs de validation et données
return exception instanceof ValidationException
|| exception instanceof DataFormatException
|| exception instanceof IllegalArgumentException;
}
// Appelé par un listener pour tracker la progression
public void incrementProcessed() {
totalProcessed.incrementAndGet();
}
}Question 9 : Comment fonctionne le restart d'un job échoué ?
Le JobRepository stocke l'état de chaque exécution. Lors d'un restart, Spring Batch identifie le dernier chunk committé et reprend depuis ce point. Les items déjà traités avec succès ne sont pas retraités.
// Service de gestion des restarts
@Service
public class JobRestartService {
private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
private final Job importJob;
public JobRestartService(JobLauncher jobLauncher,
JobExplorer jobExplorer,
JobRepository jobRepository,
@Qualifier("importJob") Job importJob) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
this.importJob = importJob;
}
public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
// Récupère l'exécution échouée
JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);
if (failedExecution == null) {
throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
}
// Vérifie que le job peut être redémarré
if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
throw new IllegalStateException("Only FAILED jobs can be restarted");
}
// Utilise les mêmes paramètres que l'exécution originale
JobParameters originalParams = failedExecution.getJobParameters();
// Relance le job - reprend automatiquement depuis le dernier checkpoint
return jobLauncher.run(importJob, originalParams);
}
public List<JobExecution> findRestartableJobs() {
// Liste toutes les exécutions FAILED non encore restartées
return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.FAILED)
.filter(this::isRestartable)
.toList();
}
private boolean isRestartable(JobExecution execution) {
// Vérifie qu'il n'y a pas d'exécution plus récente réussie
JobInstance instance = execution.getJobInstance();
return jobExplorer.getJobExecutions(instance).stream()
.noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
}
}Un job ne peut être restarté que si les JobParameters sont identiques. Modifier un paramètre crée une nouvelle instance de job, perdant ainsi l'historique de progression.
Scaling et optimisation
Question 10 : Quelles sont les stratégies de scaling disponibles ?
Spring Batch propose quatre stratégies : multi-threaded step (plusieurs threads lisent en parallèle), parallel steps (steps indépendants en parallèle), remote chunking (processing distribué), et partitioning (données distribuées).
// Step multi-threadé : plusieurs threads traitent le même dataset
@Configuration
public class MultiThreadedStepConfig {
@Bean
public Step multiThreadedStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<Record> reader,
ItemProcessor<Record, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
TaskExecutor taskExecutor) {
return new StepBuilder("multiThreadedStep", jobRepository)
.<Record, ProcessedRecord>chunk(100, transactionManager)
// ATTENTION : le reader doit être thread-safe
.reader(synchronizedReader(reader))
.processor(processor)
.writer(writer)
// 4 threads traitent les chunks en parallèle
.taskExecutor(taskExecutor)
.throttleLimit(4)
.build();
}
// Wrapper pour rendre le reader thread-safe
private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
syncReader.setDelegate((ItemStreamReader<Record>) reader);
return syncReader;
}
}// Exécution de steps indépendants en parallèle
@Configuration
public class ParallelStepsConfig {
@Bean
public Job parallelJob(JobRepository jobRepository,
Step loadCustomersStep,
Step loadProductsStep,
Step loadOrdersStep,
Step processDataStep) {
// Flow parallèle : customers et products chargés simultanément
Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
.start(loadCustomersStep)
.build();
Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
.start(loadProductsStep)
.build();
Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
.start(loadOrdersStep)
.build();
// Split exécute les flows en parallèle
return new JobBuilder("parallelJob", jobRepository)
.start(new FlowBuilder<Flow>("parallelLoadFlow")
.split(new SimpleAsyncTaskExecutor())
.add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
.build())
// Après le chargement parallèle, traitement séquentiel
.next(processDataStep)
.build()
.build();
}
}Le multi-threading convient quand le reader peut être synchronisé. Le partitioning est préférable pour les gros volumes car chaque partition a son propre reader sans contention.
Question 11 : Comment monitorer les performances d'un job ?
Spring Batch expose des métriques via les listeners et le JobRepository. L'intégration avec Micrometer permet l'export vers Prometheus, Grafana ou d'autres systèmes de monitoring.
// Configuration du monitoring avec Micrometer
@Configuration
public class BatchMetricsConfig {
private final MeterRegistry meterRegistry;
public BatchMetricsConfig(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Bean
public JobExecutionListener metricsJobListener() {
return new JobExecutionListener() {
private Timer.Sample jobTimer;
@Override
public void beforeJob(JobExecution jobExecution) {
// Démarre le timer de durée du job
jobTimer = Timer.start(meterRegistry);
Counter.builder("batch.job.started")
.tag("job", jobExecution.getJobInstance().getJobName())
.register(meterRegistry)
.increment();
}
@Override
public void afterJob(JobExecution jobExecution) {
// Enregistre la durée totale
jobTimer.stop(Timer.builder("batch.job.duration")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry));
// Compteur de jobs par statut
Counter.builder("batch.job.completed")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry)
.increment();
}
};
}
@Bean
public StepExecutionListener metricsStepListener() {
return new StepExecutionListener() {
@Override
public void afterStep(StepExecution stepExecution) {
String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
String stepName = stepExecution.getStepName();
// Métriques de throughput
Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
return null;
}
};
}
}Question 12 : Quels sont les pièges courants avec le partitioning ?
Les erreurs fréquentes incluent : partitions déséquilibrées (une partition contient 90% des données), readers non thread-safe, et gestion incorrecte de l'état entre partitions.
// Partitioner qui équilibre réellement la charge
@Component
public class BalancedPartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Compte le nombre total d'items à traiter
Integer totalCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);
if (totalCount == null || totalCount == 0) {
return Map.of();
}
// Calcule la taille cible par partition
int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);
Map<String, ExecutionContext> partitions = new HashMap<>();
// Utilise OFFSET/LIMIT pour des partitions équilibrées
// Plus coûteux que les ranges mais garantit l'équilibre
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("offset", i * itemsPerPartition);
context.putInt("limit", itemsPerPartition);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}
// OffsetBasedReader.java
// Reader compatible avec le partitioning par offset
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {
private final JdbcTemplate jdbcTemplate;
private Iterator<OrderRecord> iterator;
@Value("#{stepExecutionContext['offset']}")
private int offset;
@Value("#{stepExecutionContext['limit']}")
private int limit;
public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void open(ExecutionContext executionContext) {
// Charge exactement la portion assignée à cette partition
List<OrderRecord> records = jdbcTemplate.query(
"SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
new OrderRecordRowMapper(),
limit, offset
);
this.iterator = records.iterator();
}
@Override
public OrderRecord read() {
return iterator.hasNext() ? iterator.next() : null;
}
@Override
public void update(ExecutionContext executionContext) {
// Sauvegarde de l'état pour restart si nécessaire
}
@Override
public void close() {
// Cleanup
}
}Questions avancées pour seniors
Question 13 : Comment gérer les dépendances entre jobs ?
Spring Batch ne gère pas nativement les dépendances inter-jobs. Les solutions incluent : orchestrateurs externes (Airflow, Kubernetes CronJob), ou implémentation custom avec JobExplorer.
// Gestion des dépendances entre jobs
@Service
public class JobDependencyService {
private final JobExplorer jobExplorer;
private final JobLauncher jobLauncher;
private final Map<String, Job> jobs;
public JobDependencyService(JobExplorer jobExplorer,
JobLauncher jobLauncher,
Map<String, Job> jobs) {
this.jobExplorer = jobExplorer;
this.jobLauncher = jobLauncher;
this.jobs = jobs;
}
public JobExecution runWithDependencies(String jobName,
JobParameters params,
List<String> dependsOn) throws Exception {
// Vérifie que toutes les dépendances ont réussi
for (String dependency : dependsOn) {
if (!hasSuccessfulExecution(dependency, params)) {
throw new JobExecutionException(
"Dependency not satisfied: " + dependency);
}
}
Job job = jobs.get(jobName);
if (job == null) {
throw new IllegalArgumentException("Unknown job: " + jobName);
}
return jobLauncher.run(job, params);
}
private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
// Cherche une exécution COMPLETED avec les mêmes paramètres métier
return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
.anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
}
private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
// Compare les paramètres métier (ignore les timestamps d'exécution)
String actualDate = actual.getString("businessDate");
String expectedDate = expected.getString("businessDate");
return Objects.equals(actualDate, expectedDate);
}
}Question 14 : Comment tester efficacement un job Spring Batch ?
Les tests de jobs Spring Batch requièrent une approche en couches : tests unitaires des composants (reader, processor, writer), tests d'intégration des steps, et tests end-to-end du job complet.
// Test unitaire du processor
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {
@Mock
private PricingService pricingService;
@Mock
private ValidationService validationService;
@InjectMocks
private OrderItemProcessor processor;
@Test
void shouldProcessValidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(true);
when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));
// When
ProcessedOrder result = processor.process(input);
// Then
assertThat(result).isNotNull();
assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
}
@Test
void shouldFilterInvalidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(false);
// When
ProcessedOrder result = processor.process(input);
// Then - null signifie filtré
assertThat(result).isNull();
verify(pricingService, never()).calculatePrice(any());
}
}// Test d'intégration du job complet
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JobRepositoryTestUtils jobRepositoryTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@BeforeEach
void setup() {
// Nettoie les métadonnées entre les tests
jobRepositoryTestUtils.removeJobExecutions();
// Réinitialise les données de test
jdbcTemplate.execute("DELETE FROM processed_orders");
jdbcTemplate.execute("DELETE FROM orders");
}
@Test
void shouldCompleteJobSuccessfully() throws Exception {
// Given - données de test
insertTestOrders(100);
// When
JobParameters params = new JobParametersBuilder()
.addLocalDate("businessDate", LocalDate.now())
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncherTestUtils.launchJob(params);
// Then
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
assertThat(countProcessedOrders()).isEqualTo(100);
}
@Test
void shouldHandleEmptyDataset() throws Exception {
// Given - pas de données
// When
JobExecution execution = jobLauncherTestUtils.launchJob();
// Then - le job réussit même sans données
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
@Test
void shouldRestartFromFailurePoint() throws Exception {
// Given - simule une erreur à mi-chemin
insertTestOrders(100);
insertPoisonOrder(50); // Provoque une erreur
// When - première exécution échoue
JobExecution firstExecution = jobLauncherTestUtils.launchJob();
assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);
// Corrige les données
removePoisonOrder(50);
// When - restart
JobExecution restartExecution = jobLauncherTestUtils.launchJob(
firstExecution.getJobParameters());
// Then - reprend depuis le point d'échec
assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
private void insertTestOrders(int count) {
for (int i = 1; i <= count; i++) {
jdbcTemplate.update(
"INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
i, i * 10, BigDecimal.valueOf(i * 10));
}
}
private int countProcessedOrders() {
return jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM processed_orders", Integer.class);
}
}Question 15 : Comment optimiser les performances d'écriture en base ?
L'écriture représente souvent le goulot d'étranglement. Les optimisations incluent : batch inserts JDBC, désactivation des contraintes pendant le chargement, et utilisation de tables temporaires.
// Writer optimisé pour les gros volumes
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
private final DataSource dataSource;
public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
this.jdbcTemplate = jdbcTemplate;
this.dataSource = dataSource;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
List<? extends ProcessedOrder> items = chunk.getItems();
if (items.isEmpty()) {
return;
}
// Utilise un PreparedStatement avec batch
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
"VALUES (?, ?, ?, ?)")) {
for (ProcessedOrder order : items) {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
ps.addBatch();
}
// Exécute tous les inserts en une seule opération réseau
ps.executeBatch();
}
}
}
// StagingTableWriter.java
// Pattern staging table pour les très gros volumes
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {
private final JdbcTemplate jdbcTemplate;
private String stagingTable;
public StagingTableWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void beforeStep(StepExecution stepExecution) {
// Crée une table temporaire pour ce step
stagingTable = "staging_orders_" + stepExecution.getId();
jdbcTemplate.execute(
"CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// Écrit dans la staging table (sans contraintes FK)
String sql = "INSERT INTO " + stagingTable +
" (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";
jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
});
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
// Copie en masse vers la table finale
jdbcTemplate.execute(
"INSERT INTO processed_orders SELECT * FROM " + stagingTable);
}
// Nettoie la staging table
jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
return stepExecution.getExitStatus();
}
}Conclusion
La maîtrise de Spring Batch 5 en entretien technique repose sur la compréhension profonde des mécanismes internes :
✅ Architecture : Job → Step → Chunk (Reader, Processor, Writer)
✅ Chunk processing : dimensionnement, cycle de vie, transactions
✅ Partitioning : local vs remote, équilibrage des partitions
✅ Fault tolerance : skip, retry, restart avec politique adaptée
✅ Scaling : multi-threading, parallel steps, remote chunking
✅ Testing : unitaire, intégration, end-to-end
✅ Optimisation : batch writes, staging tables, monitoring
Les questions avancées testent la capacité à justifier des choix architecturaux selon le contexte : volume de données, contraintes de temps, tolérance aux erreurs, et infrastructure disponible.
Passe à la pratique !
Teste tes connaissances avec nos simulateurs d'entretien et tests techniques.
Tags
Partager
Articles similaires

Questions entretien Spring Boot : propagation des transactions expliquée
Maîtrisez la propagation des transactions Spring Boot : REQUIRED, REQUIRES_NEW, NESTED et plus. 12 questions d'entretien avec exemples de code et pièges courants.

Spring Modulith : architecture modulaire monolithique expliquée
Découvrez Spring Modulith pour construire des monolithes modulaires en Java. Architecture, modules, événements asynchrones et tests avec exemples Spring Boot 3.

Spring Security 6 : Authentification JWT complète
Guide pratique pour implémenter une authentification JWT avec Spring Security 6. Configuration, génération de tokens, validation et bonnes pratiques.