Entrevista Spring Batch 5: Particionamiento, Chunks y Tolerancia
Domina las entrevistas de Spring Batch 5: 15 preguntas esenciales sobre particionamiento, procesamiento por chunks y tolerancia a fallos con ejemplos en Java 21.

Spring Batch 5 representa un pilar del procesamiento de datos masivos en el ecosistema Spring. Las entrevistas técnicas evalúan la capacidad de diseñar jobs robustos, escalables y tolerantes a fallos. Dominar el particionamiento, el procesamiento por chunks y los mecanismos de tolerancia a fallos distingue a los desarrolladores senior.
Los reclutadores prueban la comprensión profunda: ¿por qué elegir partitioning sobre remote chunking? ¿Cómo dimensionar correctamente los chunks? Estas decisiones arquitectónicas revelan experiencia real en producción.
Arquitectura fundamental de Spring Batch 5
Pregunta 1: ¿Cuáles son los componentes principales de Spring Batch?
La arquitectura de Spring Batch se basa en tres capas distintas: la aplicación (jobs y código de negocio), el Batch Core (clases runtime para lanzar y controlar jobs) y la infraestructura (readers, writers y servicios comunes como RetryTemplate).
// Configuración de un job Spring Batch 5 con Java 21
@Configuration
public class BatchJobConfig {
// JobRepository almacena los metadatos de ejecución
// Permite el reinicio y monitoreo de jobs
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public BatchJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Un Job encapsula el proceso batch completo
// Compuesto por uno o más Steps ejecutados secuencialmente
@Bean
public Job importUserJob(Step processUsersStep, Step cleanupStep) {
return new JobBuilder("importUserJob", jobRepository)
.start(processUsersStep) // Step principal de procesamiento
.next(cleanupStep) // Step de limpieza
.build();
}
// Un Step representa una unidad de trabajo independiente
// Dos modelos: Tasklet (tarea única) o Chunk (procesamiento iterativo)
@Bean
public Step processUsersStep(ItemReader<UserRecord> reader,
ItemProcessor<UserRecord, User> processor,
ItemWriter<User> writer) {
return new StepBuilder("processUsersStep", jobRepository)
.<UserRecord, User>chunk(100, transactionManager) // Commit cada 100 items
.reader(reader) // Lee los datos fuente
.processor(processor) // Transforma cada item
.writer(writer) // Escribe por lotes de 100
.build();
}
}El JobRepository persiste el estado de las ejecuciones en base de datos. Esta persistencia permite reiniciar un job fallido exactamente donde se detuvo, sin reprocesar datos ya confirmados.
Pregunta 2: ¿Cuál es la diferencia entre Tasklet y procesamiento por Chunk?
Tasklet ejecuta una acción discreta y no repetitiva: eliminar archivos, llamar a un procedure almacenado, enviar emails de notificación. Chunk procesa volúmenes masivos dividiendo los datos en lotes manejables.
// Tasklet: acción única sin iteración
@Component
public class CleanupTasklet implements Tasklet {
private final Path tempDirectory = Path.of("/tmp/batch-work");
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// Elimina todos los archivos temporales del procesamiento
try (var files = Files.walk(tempDirectory)) {
files.filter(Files::isRegularFile)
.forEach(this::deleteQuietly);
}
// FINISHED indica que el tasklet completó su trabajo
// CONTINUABLE reiniciaría la ejecución (útil para polling)
return RepeatStatus.FINISHED;
}
private void deleteQuietly(Path file) {
try {
Files.delete(file);
} catch (IOException e) {
// Loguea y continúa - no falla el job por un archivo
}
}
}// Procesamiento por chunks: procesamiento de alto volumen
@Configuration
public class ChunkProcessingConfig {
@Bean
public Step processOrdersStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("processOrdersStep", jobRepository)
// Chunk de 500: lee 500 items, procesa, escribe y luego commit
.<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Listener para monitorear el progreso
.listener(new ChunkProgressListener())
.build();
}
}El procesamiento orientado a chunks aporta beneficios críticos: gestión optimizada de memoria (solo el chunk actual en memoria), transacciones granulares (commit por chunk) y recuperación de fallos en el último chunk confirmado.
Profundizando en el procesamiento por Chunks
Pregunta 3: ¿Cómo funciona el ciclo de vida de un chunk?
Cada chunk sigue un ciclo preciso: lectura de items uno a uno hasta alcanzar el tamaño configurado, procesamiento individual de cada item, luego escritura del grupo. Una transacción envuelve todo el chunk.
// ItemReader: lee un item a la vez
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {
// @StepScope: nueva instancia por ejecución del step
// Permite inyectar parámetros dinámicos del job
@Value("#{jobParameters['startDate']}")
private LocalDate startDate;
private Iterator<OrderRecord> orderIterator;
@BeforeStep
public void initializeReader(StepExecution stepExecution) {
// Carga los datos al inicio del step
List<OrderRecord> orders = fetchOrdersFromDate(startDate);
this.orderIterator = orders.iterator();
}
@Override
public OrderRecord read() {
// Devuelve null para señalar el fin de los datos
// Spring Batch llama a read() hasta recibir null
if (orderIterator.hasNext()) {
return orderIterator.next();
}
return null; // Fin del dataset
}
private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
// Recupera desde la fuente de datos
return List.of(); // Implementación real
}
}// ItemProcessor: transforma cada item individualmente
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {
private final PricingService pricingService;
private final ValidationService validationService;
public OrderItemProcessor(PricingService pricingService,
ValidationService validationService) {
this.pricingService = pricingService;
this.validationService = validationService;
}
@Override
public ProcessedOrder process(OrderRecord item) {
// Devolver null filtra el item (no será escrito)
if (!validationService.isValid(item)) {
return null; // Item filtrado
}
// Transformación de negocio
BigDecimal finalPrice = pricingService.calculatePrice(item);
return new ProcessedOrder(
item.orderId(),
item.customerId(),
finalPrice,
LocalDateTime.now()
);
}
}// ItemWriter: escribe el chunk completo en una sola operación
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
public OrderItemWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// El chunk contiene todos los items procesados
// Escritura por lotes para rendimiento optimizado
List<? extends ProcessedOrder> items = chunk.getItems();
jdbcTemplate.batchUpdate(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
items,
items.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
}
);
}
}Si ocurre una excepción durante el procesamiento del chunk, la transacción se revierte. El job puede entonces reanudarse desde ese chunk usando los metadatos almacenados en el JobRepository.
Pregunta 4: ¿Cómo elegir el tamaño óptimo de chunk?
El tamaño del chunk impacta directamente el rendimiento y consumo de memoria. Un chunk demasiado pequeño multiplica los commits (overhead). Un chunk demasiado grande consume memoria excesiva y alarga los rollbacks en caso de fallo.
// Configuración dinámica del tamaño de chunk
@Configuration
public class ChunkSizingConfig {
// Valor por defecto razonable para la mayoría de casos
private static final int DEFAULT_CHUNK_SIZE = 100;
// Para items ligeros (pocos campos)
private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;
// Para items pesados (blobs, documentos)
private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;
@Bean
public Step processLightDataStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<LightRecord> reader,
ItemWriter<LightRecord> writer) {
return new StepBuilder("processLightDataStep", jobRepository)
// Items ligeros: chunks más grandes para menos commits
.<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Step processDocumentsStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<Document> reader,
ItemProcessor<Document, ProcessedDocument> processor,
ItemWriter<ProcessedDocument> writer) {
return new StepBuilder("processDocumentsStep", jobRepository)
// Documentos pesados: chunks más pequeños para limitar memoria
.<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}Comenzar con 100 items por chunk, luego ajustar según métricas: tiempo de commit, uso de memoria y duración del rollback. Usar listeners para monitorear e identificar el punto óptimo.
Particionamiento para procesamiento paralelo
Pregunta 5: ¿Qué es el particionamiento y cuándo usarlo?
El particionamiento divide un dataset en particiones independientes procesadas en paralelo. Cada partición se ejecuta en su propio thread (local) o en un worker remoto. Este enfoque multiplica el throughput sin sacrificar la capacidad de reinicio.
// Configuración de un job particionado
@Configuration
public class PartitionedJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public PartitionedJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
@Bean
public Job partitionedImportJob(Step partitionedStep) {
return new JobBuilder("partitionedImportJob", jobRepository)
.start(partitionedStep)
.build();
}
// Step manager: orquesta las particiones
@Bean
public Step partitionedStep(Partitioner partitioner,
Step workerStep,
TaskExecutor taskExecutor) {
return new StepBuilder("partitionedStep", jobRepository)
// Divide el trabajo vía el Partitioner
.partitioner("workerStep", partitioner)
// Step ejecutado para cada partición
.step(workerStep)
// 8 threads paralelos
.taskExecutor(taskExecutor)
// Número de particiones a crear
.gridSize(8)
.build();
}
// TaskExecutor para ejecución paralela
@Bean
public TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("batch-partition-");
executor.initialize();
return executor;
}
}// Partitioner basado en rangos de ID
@Component
public class RangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public RangePartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Recupera los límites del dataset
Long minId = jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
Long maxId = jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);
if (minId == null || maxId == null) {
return Map.of(); // Sin datos para procesar
}
// Calcula el tamaño de cada partición
long range = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
long start = minId + (i * range);
long end = Math.min(start + range - 1, maxId);
// Cada partición recibe sus límites
context.putLong("minId", start);
context.putLong("maxId", end);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}El particionamiento es adecuado para datasets grandes donde los items son independientes. Las particiones deben estar balanceadas para evitar que una partición lenta ralentice todo el job.
Pregunta 6: ¿Cuál es la diferencia entre particionamiento local y remoto?
El particionamiento local ejecuta todas las particiones en la misma JVM con un thread pool. El particionamiento remoto distribuye las particiones entre múltiples JVMs (workers) vía middleware de mensajería.
// Configuración de particionamiento remoto con mensajería
@Configuration
public class RemotePartitioningConfig {
@Bean
public Step managerStep(JobRepository jobRepository,
Partitioner partitioner,
MessageChannelPartitionHandler partitionHandler) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", partitioner)
// Handler que comunica con los workers remotos
.partitionHandler(partitionHandler)
.build();
}
// PartitionHandler envía los ExecutionContexts a los workers
@Bean
public MessageChannelPartitionHandler partitionHandler(
MessagingTemplate messagingTemplate,
JobExplorer jobExplorer) {
MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
handler.setStepName("workerStep");
handler.setGridSize(4);
handler.setMessagingOperations(messagingTemplate);
handler.setJobExplorer(jobExplorer);
// Timeout de espera para que los workers terminen
handler.setPollInterval(5000L);
return handler;
}
}// Configuración del lado worker
@Configuration
public class WorkerConfiguration {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public WorkerConfiguration(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// El worker recibe particiones y ejecuta el step
@Bean
public Step workerStep(ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("workerStep", jobRepository)
.<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
// Reader configurado con @StepScope para recibir parámetros de partición
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
// Reader que usa los límites de partición
@Bean
@StepScope
public JdbcCursorItemReader<OrderRecord> partitionedReader(
DataSource dataSource,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
return new JdbcCursorItemReaderBuilder<OrderRecord>()
.name("partitionedOrderReader")
.dataSource(dataSource)
.sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
.preparedStatementSetter(ps -> {
ps.setLong(1, minId);
ps.setLong(2, maxId);
})
.rowMapper(new OrderRecordRowMapper())
.build();
}
}¿Listo para aprobar tus entrevistas de Spring Boot?
Practica con nuestros simuladores interactivos, flashcards y tests técnicos.
Tolerancia a fallos y recuperación de errores
Pregunta 7: ¿Qué mecanismos de tolerancia a fallos ofrece Spring Batch?
Spring Batch ofrece tres mecanismos complementarios: skip (ignorar items que fallan), retry (reintentar automáticamente) y restart (reanudar un job fallido). Estos mecanismos se configuran a nivel de step.
// Configuración completa de tolerancia a fallos
@Configuration
public class FaultTolerantStepConfig {
@Bean
public Step faultTolerantStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<DataRecord> reader,
ItemProcessor<DataRecord, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
SkipPolicy customSkipPolicy) {
return new StepBuilder("faultTolerantStep", jobRepository)
.<DataRecord, ProcessedRecord>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Activa el modo tolerante a fallos
.faultTolerant()
// SKIP: ignora hasta 10 errores de validación
.skipLimit(10)
.skip(ValidationException.class)
.skip(DataIntegrityViolationException.class)
// Algunos errores nunca deben ser ignorados
.noSkip(FatalBatchException.class)
// RETRY: reintenta errores transitorios
.retryLimit(3)
.retry(TransientDataAccessException.class)
.retry(DeadlockLoserDataAccessException.class)
// Backoff exponencial entre reintentos
.backOffPolicy(exponentialBackOffPolicy())
// Listener para loguear los skips
.listener(skipListener())
.build();
}
@Bean
public BackOffPolicy exponentialBackOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(1000); // 1 segundo
policy.setMultiplier(2.0); // Se duplica en cada reintento
policy.setMaxInterval(10000); // Máximo 10 segundos
return policy;
}
@Bean
public SkipListener<DataRecord, ProcessedRecord> skipListener() {
return new SkipListener<>() {
@Override
public void onSkipInRead(Throwable t) {
// Loguea el item ilegible
}
@Override
public void onSkipInProcess(DataRecord item, Throwable t) {
// Loguea el item que falló al procesar
}
@Override
public void onSkipInWrite(ProcessedRecord item, Throwable t) {
// Loguea el item que falló al escribir
}
};
}
}Retry es adecuado para errores transitorios (timeout de red, deadlock de base de datos). Skip es adecuado para errores de datos individuales que no deben bloquear el procesamiento global.
Pregunta 8: ¿Cómo implementar un SkipPolicy personalizado?
Un SkipPolicy personalizado permite una lógica de decisión fina: ignorar según el tipo de excepción, número de errores o criterios de negocio específicos.
// SkipPolicy con lógica de negocio avanzada
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {
private static final int MAX_SKIP_COUNT = 100;
private static final double MAX_SKIP_PERCENTAGE = 0.05; // 5% máximo
private final AtomicInteger totalProcessed = new AtomicInteger(0);
private final AtomicInteger skipCount = new AtomicInteger(0);
@Override
public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
// Nunca ignorar errores fatales
if (exception instanceof FatalBatchException
|| exception instanceof OutOfMemoryError) {
return false;
}
// Límite absoluto de skips
if (skipCountSoFar >= MAX_SKIP_COUNT) {
return false; // Detiene el job
}
// Límite porcentual
int total = totalProcessed.get();
if (total > 1000) { // Aplicar solo después del warmup
double skipPercentage = (double) skipCountSoFar / total;
if (skipPercentage > MAX_SKIP_PERCENTAGE) {
return false; // Demasiados errores proporcionalmente
}
}
// Ignora errores de validación y datos
return exception instanceof ValidationException
|| exception instanceof DataFormatException
|| exception instanceof IllegalArgumentException;
}
// Llamado por un listener para rastrear el progreso
public void incrementProcessed() {
totalProcessed.incrementAndGet();
}
}Pregunta 9: ¿Cómo funciona el reinicio de un job fallido?
El JobRepository almacena el estado de cada ejecución. Al reiniciar, Spring Batch identifica el último chunk confirmado y reanuda desde ese punto. Los items procesados con éxito no se reprocesan.
// Servicio de gestión de reinicio de jobs
@Service
public class JobRestartService {
private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
private final Job importJob;
public JobRestartService(JobLauncher jobLauncher,
JobExplorer jobExplorer,
JobRepository jobRepository,
@Qualifier("importJob") Job importJob) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
this.importJob = importJob;
}
public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
// Recupera la ejecución fallida
JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);
if (failedExecution == null) {
throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
}
// Verifica que el job pueda reiniciarse
if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
throw new IllegalStateException("Only FAILED jobs can be restarted");
}
// Usa los mismos parámetros que la ejecución original
JobParameters originalParams = failedExecution.getJobParameters();
// Relanza el job - reanuda automáticamente desde el último checkpoint
return jobLauncher.run(importJob, originalParams);
}
public List<JobExecution> findRestartableJobs() {
// Lista todas las ejecuciones FAILED no reiniciadas aún
return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.FAILED)
.filter(this::isRestartable)
.toList();
}
private boolean isRestartable(JobExecution execution) {
// Verifica que no exista ejecución exitosa más reciente
JobInstance instance = execution.getJobInstance();
return jobExplorer.getJobExecutions(instance).stream()
.noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
}
}Un job solo puede reiniciarse si los JobParameters son idénticos. Modificar un parámetro crea una nueva instancia de job, perdiendo el historial de progreso.
Escalado y optimización
Pregunta 10: ¿Qué estrategias de escalado están disponibles?
Spring Batch ofrece cuatro estrategias: multi-threaded step (varios threads leen en paralelo), parallel steps (steps independientes en paralelo), remote chunking (procesamiento distribuido) y partitioning (datos distribuidos).
// Step multi-threaded: varios threads procesan el mismo dataset
@Configuration
public class MultiThreadedStepConfig {
@Bean
public Step multiThreadedStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<Record> reader,
ItemProcessor<Record, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
TaskExecutor taskExecutor) {
return new StepBuilder("multiThreadedStep", jobRepository)
.<Record, ProcessedRecord>chunk(100, transactionManager)
// CUIDADO: el reader debe ser thread-safe
.reader(synchronizedReader(reader))
.processor(processor)
.writer(writer)
// 4 threads procesan chunks en paralelo
.taskExecutor(taskExecutor)
.throttleLimit(4)
.build();
}
// Wrapper para hacer el reader thread-safe
private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
syncReader.setDelegate((ItemStreamReader<Record>) reader);
return syncReader;
}
}// Ejecución de steps independientes en paralelo
@Configuration
public class ParallelStepsConfig {
@Bean
public Job parallelJob(JobRepository jobRepository,
Step loadCustomersStep,
Step loadProductsStep,
Step loadOrdersStep,
Step processDataStep) {
// Flujo paralelo: customers y products cargados simultáneamente
Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
.start(loadCustomersStep)
.build();
Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
.start(loadProductsStep)
.build();
Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
.start(loadOrdersStep)
.build();
// Split ejecuta los flujos en paralelo
return new JobBuilder("parallelJob", jobRepository)
.start(new FlowBuilder<Flow>("parallelLoadFlow")
.split(new SimpleAsyncTaskExecutor())
.add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
.build())
// Después de la carga paralela, procesamiento secuencial
.next(processDataStep)
.build()
.build();
}
}El multi-threading es adecuado para casos donde el reader puede sincronizarse. El particionamiento es preferible para grandes volúmenes ya que cada partición tiene su propio reader sin contención.
Pregunta 11: ¿Cómo monitorear el rendimiento de un job?
Spring Batch expone métricas vía listeners y JobRepository. La integración con Micrometer permite exportar a Prometheus, Grafana u otros sistemas de monitoreo.
// Configuración de monitoreo con Micrometer
@Configuration
public class BatchMetricsConfig {
private final MeterRegistry meterRegistry;
public BatchMetricsConfig(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Bean
public JobExecutionListener metricsJobListener() {
return new JobExecutionListener() {
private Timer.Sample jobTimer;
@Override
public void beforeJob(JobExecution jobExecution) {
// Inicia el timer de duración del job
jobTimer = Timer.start(meterRegistry);
Counter.builder("batch.job.started")
.tag("job", jobExecution.getJobInstance().getJobName())
.register(meterRegistry)
.increment();
}
@Override
public void afterJob(JobExecution jobExecution) {
// Registra la duración total
jobTimer.stop(Timer.builder("batch.job.duration")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry));
// Contador de jobs por estado
Counter.builder("batch.job.completed")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry)
.increment();
}
};
}
@Bean
public StepExecutionListener metricsStepListener() {
return new StepExecutionListener() {
@Override
public void afterStep(StepExecution stepExecution) {
String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
String stepName = stepExecution.getStepName();
// Métricas de throughput
Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
return null;
}
};
}
}Pregunta 12: ¿Cuáles son las trampas comunes con el particionamiento?
Los errores frecuentes incluyen: particiones desbalanceadas (una partición contiene 90% de los datos), readers no thread-safe y gestión incorrecta del estado entre particiones.
// Partitioner que realmente balancea la carga
@Component
public class BalancedPartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Cuenta el total de items a procesar
Integer totalCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);
if (totalCount == null || totalCount == 0) {
return Map.of();
}
// Calcula el tamaño objetivo por partición
int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);
Map<String, ExecutionContext> partitions = new HashMap<>();
// Usa OFFSET/LIMIT para particiones balanceadas
// Más costoso que rangos pero garantiza el balance
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("offset", i * itemsPerPartition);
context.putInt("limit", itemsPerPartition);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}
// OffsetBasedReader.java
// Reader compatible con particionamiento basado en offset
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {
private final JdbcTemplate jdbcTemplate;
private Iterator<OrderRecord> iterator;
@Value("#{stepExecutionContext['offset']}")
private int offset;
@Value("#{stepExecutionContext['limit']}")
private int limit;
public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void open(ExecutionContext executionContext) {
// Carga exactamente la porción asignada a esta partición
List<OrderRecord> records = jdbcTemplate.query(
"SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
new OrderRecordRowMapper(),
limit, offset
);
this.iterator = records.iterator();
}
@Override
public OrderRecord read() {
return iterator.hasNext() ? iterator.next() : null;
}
@Override
public void update(ExecutionContext executionContext) {
// Guardado de estado para reinicio si es necesario
}
@Override
public void close() {
// Limpieza
}
}Preguntas avanzadas para senior
Pregunta 13: ¿Cómo gestionar dependencias entre jobs?
Spring Batch no gestiona nativamente las dependencias entre jobs. Las soluciones incluyen: orquestadores externos (Airflow, Kubernetes CronJob) o implementación personalizada con JobExplorer.
// Gestión de dependencias entre jobs
@Service
public class JobDependencyService {
private final JobExplorer jobExplorer;
private final JobLauncher jobLauncher;
private final Map<String, Job> jobs;
public JobDependencyService(JobExplorer jobExplorer,
JobLauncher jobLauncher,
Map<String, Job> jobs) {
this.jobExplorer = jobExplorer;
this.jobLauncher = jobLauncher;
this.jobs = jobs;
}
public JobExecution runWithDependencies(String jobName,
JobParameters params,
List<String> dependsOn) throws Exception {
// Verifica que todas las dependencias hayan tenido éxito
for (String dependency : dependsOn) {
if (!hasSuccessfulExecution(dependency, params)) {
throw new JobExecutionException(
"Dependency not satisfied: " + dependency);
}
}
Job job = jobs.get(jobName);
if (job == null) {
throw new IllegalArgumentException("Unknown job: " + jobName);
}
return jobLauncher.run(job, params);
}
private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
// Busca una ejecución COMPLETED con los mismos parámetros de negocio
return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
.anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
}
private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
// Compara los parámetros de negocio (ignora timestamps de ejecución)
String actualDate = actual.getString("businessDate");
String expectedDate = expected.getString("businessDate");
return Objects.equals(actualDate, expectedDate);
}
}Pregunta 14: ¿Cómo testear eficazmente un job Spring Batch?
Testear jobs Spring Batch requiere un enfoque por capas: tests unitarios de componentes (reader, processor, writer), tests de integración de steps y tests end-to-end de jobs completos.
// Test unitario del processor
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {
@Mock
private PricingService pricingService;
@Mock
private ValidationService validationService;
@InjectMocks
private OrderItemProcessor processor;
@Test
void shouldProcessValidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(true);
when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));
// When
ProcessedOrder result = processor.process(input);
// Then
assertThat(result).isNotNull();
assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
}
@Test
void shouldFilterInvalidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(false);
// When
ProcessedOrder result = processor.process(input);
// Then - null significa filtrado
assertThat(result).isNull();
verify(pricingService, never()).calculatePrice(any());
}
}// Test de integración del job completo
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JobRepositoryTestUtils jobRepositoryTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@BeforeEach
void setup() {
// Limpia los metadatos entre tests
jobRepositoryTestUtils.removeJobExecutions();
// Resetea los datos de test
jdbcTemplate.execute("DELETE FROM processed_orders");
jdbcTemplate.execute("DELETE FROM orders");
}
@Test
void shouldCompleteJobSuccessfully() throws Exception {
// Given - datos de test
insertTestOrders(100);
// When
JobParameters params = new JobParametersBuilder()
.addLocalDate("businessDate", LocalDate.now())
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncherTestUtils.launchJob(params);
// Then
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
assertThat(countProcessedOrders()).isEqualTo(100);
}
@Test
void shouldHandleEmptyDataset() throws Exception {
// Given - sin datos
// When
JobExecution execution = jobLauncherTestUtils.launchJob();
// Then - el job tiene éxito incluso sin datos
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
@Test
void shouldRestartFromFailurePoint() throws Exception {
// Given - simula error a mitad de procesamiento
insertTestOrders(100);
insertPoisonOrder(50); // Causa un error
// When - primera ejecución falla
JobExecution firstExecution = jobLauncherTestUtils.launchJob();
assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);
// Corrige los datos
removePoisonOrder(50);
// When - reinicio
JobExecution restartExecution = jobLauncherTestUtils.launchJob(
firstExecution.getJobParameters());
// Then - reanuda desde el punto de fallo
assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
private void insertTestOrders(int count) {
for (int i = 1; i <= count; i++) {
jdbcTemplate.update(
"INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
i, i * 10, BigDecimal.valueOf(i * 10));
}
}
private int countProcessedOrders() {
return jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM processed_orders", Integer.class);
}
}Pregunta 15: ¿Cómo optimizar el rendimiento de escrituras en base de datos?
La escritura suele convertirse en el cuello de botella. Las optimizaciones incluyen: batch inserts JDBC, desactivación de constraints durante la carga y uso de tablas de staging.
// Writer optimizado para grandes volúmenes
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
private final DataSource dataSource;
public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
this.jdbcTemplate = jdbcTemplate;
this.dataSource = dataSource;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
List<? extends ProcessedOrder> items = chunk.getItems();
if (items.isEmpty()) {
return;
}
// Usa PreparedStatement con batch
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
"VALUES (?, ?, ?, ?)")) {
for (ProcessedOrder order : items) {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
ps.addBatch();
}
// Ejecuta todos los inserts en una sola operación de red
ps.executeBatch();
}
}
}
// StagingTableWriter.java
// Patrón de tabla de staging para volúmenes muy grandes
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {
private final JdbcTemplate jdbcTemplate;
private String stagingTable;
public StagingTableWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void beforeStep(StepExecution stepExecution) {
// Crea una tabla temporal para este step
stagingTable = "staging_orders_" + stepExecution.getId();
jdbcTemplate.execute(
"CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// Escribe en la tabla de staging (sin constraints FK)
String sql = "INSERT INTO " + stagingTable +
" (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";
jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
});
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
// Bulk copy a la tabla final
jdbcTemplate.execute(
"INSERT INTO processed_orders SELECT * FROM " + stagingTable);
}
// Limpia la tabla de staging
jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
return stepExecution.getExitStatus();
}
}Conclusión
Dominar Spring Batch 5 en entrevistas técnicas se basa en una comprensión profunda de los mecanismos internos:
✅ Arquitectura: Job → Step → Chunk (Reader, Processor, Writer)
✅ Procesamiento por chunks: dimensionamiento, ciclo de vida, transacciones
✅ Particionamiento: local vs remoto, balance de particiones
✅ Tolerancia a fallos: skip, retry, restart con políticas adecuadas
✅ Escalado: multi-threading, parallel steps, remote chunking
✅ Tests: unitarios, de integración, end-to-end
✅ Optimización: batch writes, tablas de staging, monitoreo
Las preguntas avanzadas evalúan la capacidad de justificar las decisiones arquitectónicas según el contexto: volumen de datos, restricciones de tiempo, tolerancia a errores e infraestructura disponible.
¡Empieza a practicar!
Pon a prueba tu conocimiento con nuestros simuladores de entrevista y tests técnicos.
Etiquetas
Compartir
Artículos relacionados

Spring Modulith: Arquitectura de Monolito Modular Explicada
Aprende Spring Modulith para construir monolitos modulares en Java. Arquitectura, módulos, eventos asíncronos y testing con ejemplos en Spring Boot 3.

Entrevista Spring Boot: Propagación de Transacciones
Domina la propagación de transacciones en Spring Boot: REQUIRED, REQUIRES_NEW, NESTED y más. 12 preguntas de entrevista con código y trampas comunes.

Spring Security 6: Autenticación JWT Completa
Guía práctica para implementar autenticación JWT con Spring Security 6. Configuración, generación de tokens, validación y buenas prácticas de seguridad.