Співбесіда Spring Batch 5: Партиціювання, Чанки та Відмовостійкість
Опануйте співбесіди Spring Batch 5: 15 ключових питань про партиціювання, обробку чанків і відмовостійкість з прикладами на Java 21.

Spring Batch 5 є наріжним каменем обробки великих обсягів даних в екосистемі Spring. Технічні співбесіди оцінюють здатність проєктувати надійні, масштабовані та відмовостійкі задачі. Володіння партиціюванням, чанк-орієнтованою обробкою та механізмами відмовостійкості вирізняє старших розробників.
Рекрутери перевіряють глибоке розуміння: чому обирати партиціювання замість remote chunking? Як правильно визначати розмір чанків? Ці архітектурні рішення розкривають реальний продакшн-досвід.
Фундаментальна архітектура Spring Batch 5
Питання 1: Які основні компоненти Spring Batch?
Архітектура Spring Batch базується на трьох рівнях: застосунок (задачі та бізнес-код), Batch Core (runtime-класи для запуску й керування задачами) та інфраструктура (спільні reader, writer і сервіси на кшталт RetryTemplate).
// Конфігурація задачі Spring Batch 5 з Java 21
@Configuration
public class BatchJobConfig {
// JobRepository зберігає метадані виконання
// Уможливлює рестарт і моніторинг задач
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public BatchJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Job інкапсулює повний batch-процес
// Складається з одного або кількох послідовних Step
@Bean
public Job importUserJob(Step processUsersStep, Step cleanupStep) {
return new JobBuilder("importUserJob", jobRepository)
.start(processUsersStep) // Основний крок обробки
.next(cleanupStep) // Крок очищення
.build();
}
// Step репрезентує незалежну одиницю роботи
// Дві моделі: Tasklet (одна задача) або Chunk (ітераційна обробка)
@Bean
public Step processUsersStep(ItemReader<UserRecord> reader,
ItemProcessor<UserRecord, User> processor,
ItemWriter<User> writer) {
return new StepBuilder("processUsersStep", jobRepository)
.<UserRecord, User>chunk(100, transactionManager) // Commit кожні 100 елементів
.reader(reader) // Читає вихідні дані
.processor(processor) // Трансформує кожен елемент
.writer(writer) // Записує партіями по 100
.build();
}
}JobRepository зберігає стан виконань у базі даних. Така персистентність дозволяє відновити невдалу задачу саме там, де вона зупинилася, без повторної обробки вже закомічених даних.
Питання 2: Чим Tasklet відрізняється від чанк-орієнтованої обробки?
Tasklet виконує дискретну, неітеративну дію: видалення файлу, виклик stored procedure, надсилання сповіщення. Chunk обробляє великі обсяги, поділяючи дані на керовані партії.
// Tasklet: одинична дія без ітерації
@Component
public class CleanupTasklet implements Tasklet {
private final Path tempDirectory = Path.of("/tmp/batch-work");
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// Видаляє всі тимчасові файли обробки
try (var files = Files.walk(tempDirectory)) {
files.filter(Files::isRegularFile)
.forEach(this::deleteQuietly);
}
// FINISHED означає, що tasklet завершив роботу
// CONTINUABLE перезапустить виконання (корисно для polling)
return RepeatStatus.FINISHED;
}
private void deleteQuietly(Path file) {
try {
Files.delete(file);
} catch (IOException e) {
// Лог і продовжуємо - не зупиняємо задачу через один файл
}
}
}// Чанк-обробка: оброблення великих обсягів
@Configuration
public class ChunkProcessingConfig {
@Bean
public Step processOrdersStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("processOrdersStep", jobRepository)
// Чанк 500: читає 500 елементів, обробляє, пише, потім commit
.<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Listener для моніторингу прогресу
.listener(new ChunkProgressListener())
.build();
}
}Чанк-орієнтована обробка дає критичні переваги: оптимізоване керування пам'яттю (лише поточний чанк у пам'яті), гранулярні транзакції (commit на чанк) і відновлення після збоїв на останньому закоміченому чанку.
Поглиблено про чанк-орієнтовану обробку
Питання 3: Як працює життєвий цикл чанку?
Кожен чанк проходить чіткий цикл: читання елементів по одному до досягнення налаштованого розміру, індивідуальна обробка кожного елемента, далі запис групи. Транзакція огортає весь чанк.
// ItemReader: читає по одному елементу
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {
// @StepScope: новий екземпляр на кожне виконання step
// Дозволяє інжектити динамічні параметри задачі
@Value("#{jobParameters['startDate']}")
private LocalDate startDate;
private Iterator<OrderRecord> orderIterator;
@BeforeStep
public void initializeReader(StepExecution stepExecution) {
// Завантажує дані на початку step
List<OrderRecord> orders = fetchOrdersFromDate(startDate);
this.orderIterator = orders.iterator();
}
@Override
public OrderRecord read() {
// Повертає null для сигналізації кінця даних
// Spring Batch викликає read(), доки не отримає null
if (orderIterator.hasNext()) {
return orderIterator.next();
}
return null; // Кінець датасету
}
private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
// Дістає з джерела даних
return List.of(); // Реальна реалізація
}
}// ItemProcessor: трансформує кожен елемент окремо
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {
private final PricingService pricingService;
private final ValidationService validationService;
public OrderItemProcessor(PricingService pricingService,
ValidationService validationService) {
this.pricingService = pricingService;
this.validationService = validationService;
}
@Override
public ProcessedOrder process(OrderRecord item) {
// Повернення null фільтрує елемент (не запишеться)
if (!validationService.isValid(item)) {
return null; // Елемент відфільтровано
}
// Бізнес-трансформація
BigDecimal finalPrice = pricingService.calculatePrice(item);
return new ProcessedOrder(
item.orderId(),
item.customerId(),
finalPrice,
LocalDateTime.now()
);
}
}// ItemWriter: записує весь чанк за одну операцію
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
public OrderItemWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// Чанк містить усі оброблені елементи
// Batch-запис для оптимальної продуктивності
List<? extends ProcessedOrder> items = chunk.getItems();
jdbcTemplate.batchUpdate(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
items,
items.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
}
);
}
}Якщо під час обробки чанку виникає виняток, транзакція відкочується. Задача може потім продовжити з цього чанку, використовуючи метадані з JobRepository.
Питання 4: Як обрати оптимальний розмір чанку?
Розмір чанку прямо впливає на продуктивність і споживання пам'яті. Замалий чанк збільшує кількість commit (overhead). Завеликий чанк надмірно споживає пам'ять і подовжує rollback при збоях.
// Динамічна конфігурація розміру чанку
@Configuration
public class ChunkSizingConfig {
// Розумне значення за замовчуванням для більшості випадків
private static final int DEFAULT_CHUNK_SIZE = 100;
// Для легких елементів (мало полів)
private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;
// Для важких елементів (blob, документи)
private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;
@Bean
public Step processLightDataStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<LightRecord> reader,
ItemWriter<LightRecord> writer) {
return new StepBuilder("processLightDataStep", jobRepository)
// Легкі елементи: більші чанки для меншої кількості commit
.<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Step processDocumentsStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<Document> reader,
ItemProcessor<Document, ProcessedDocument> processor,
ItemWriter<ProcessedDocument> writer) {
return new StepBuilder("processDocumentsStep", jobRepository)
// Важкі документи: менші чанки для обмеження пам'яті
.<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}Починати зі 100 елементів на чанк, далі коригувати за метриками: час commit, споживання пам'яті та тривалість rollback. Використовувати listener для виявлення оптимуму.
Партиціювання для паралельної обробки
Питання 5: Що таке партиціювання та коли його застосовувати?
Партиціювання поділяє датасет на незалежні партиції, які обробляються паралельно. Кожна партиція виконується у власному потоці (локально) або на віддаленому worker. Цей підхід множить пропускну спроможність без втрати можливості рестарту.
// Конфігурація партиційованої задачі
@Configuration
public class PartitionedJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public PartitionedJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
@Bean
public Job partitionedImportJob(Step partitionedStep) {
return new JobBuilder("partitionedImportJob", jobRepository)
.start(partitionedStep)
.build();
}
// Manager step: оркеструє партиції
@Bean
public Step partitionedStep(Partitioner partitioner,
Step workerStep,
TaskExecutor taskExecutor) {
return new StepBuilder("partitionedStep", jobRepository)
// Розподіляє роботу через Partitioner
.partitioner("workerStep", partitioner)
// Step, що виконується для кожної партиції
.step(workerStep)
// 8 паралельних потоків
.taskExecutor(taskExecutor)
// Кількість партицій для створення
.gridSize(8)
.build();
}
// TaskExecutor для паралельного виконання
@Bean
public TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("batch-partition-");
executor.initialize();
return executor;
}
}// Partitioner на основі діапазонів ID
@Component
public class RangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public RangePartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Дістає межі датасету
Long minId = jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
Long maxId = jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);
if (minId == null || maxId == null) {
return Map.of(); // Немає даних для обробки
}
// Обчислює розмір кожної партиції
long range = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
long start = minId + (i * range);
long end = Math.min(start + range - 1, maxId);
// Кожна партиція отримує свої межі
context.putLong("minId", start);
context.putLong("maxId", end);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}Партиціювання підходить для великих датасетів, де елементи незалежні. Партиції мають бути збалансованими, щоб повільна партиція не уповільнювала всю задачу.
Питання 6: Чим відрізняються локальне й віддалене партиціювання?
Локальне партиціювання виконує всі партиції у тій самій JVM з пулом потоків. Віддалене партиціювання розподіляє партиції між кількома JVM (worker) через middleware повідомлень.
// Конфігурація віддаленого партиціювання з повідомленнями
@Configuration
public class RemotePartitioningConfig {
@Bean
public Step managerStep(JobRepository jobRepository,
Partitioner partitioner,
MessageChannelPartitionHandler partitionHandler) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", partitioner)
// Handler, що спілкується з віддаленими worker
.partitionHandler(partitionHandler)
.build();
}
// PartitionHandler надсилає ExecutionContext до worker
@Bean
public MessageChannelPartitionHandler partitionHandler(
MessagingTemplate messagingTemplate,
JobExplorer jobExplorer) {
MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
handler.setStepName("workerStep");
handler.setGridSize(4);
handler.setMessagingOperations(messagingTemplate);
handler.setJobExplorer(jobExplorer);
// Таймаут очікування завершення worker
handler.setPollInterval(5000L);
return handler;
}
}// Конфігурація на стороні worker
@Configuration
public class WorkerConfiguration {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public WorkerConfiguration(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Worker отримує партиції та виконує step
@Bean
public Step workerStep(ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("workerStep", jobRepository)
.<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
// Reader з @StepScope для отримання параметрів партиції
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
// Reader, що використовує межі партиції
@Bean
@StepScope
public JdbcCursorItemReader<OrderRecord> partitionedReader(
DataSource dataSource,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
return new JdbcCursorItemReaderBuilder<OrderRecord>()
.name("partitionedOrderReader")
.dataSource(dataSource)
.sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
.preparedStatementSetter(ps -> {
ps.setLong(1, minId);
ps.setLong(2, maxId);
})
.rowMapper(new OrderRecordRowMapper())
.build();
}
}Готовий до співбесід з Spring Boot?
Практикуйся з нашими інтерактивними симуляторами, flashcards та технічними тестами.
Відмовостійкість і відновлення після помилок
Питання 7: Які механізми відмовостійкості пропонує Spring Batch?
Spring Batch пропонує три взаємодоповнюючі механізми: skip (ігнорувати елементи зі збоєм), retry (автоматично повторити) та restart (продовжити невдалу задачу). Ці механізми налаштовуються на рівні step.
// Повна конфігурація відмовостійкості
@Configuration
public class FaultTolerantStepConfig {
@Bean
public Step faultTolerantStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<DataRecord> reader,
ItemProcessor<DataRecord, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
SkipPolicy customSkipPolicy) {
return new StepBuilder("faultTolerantStep", jobRepository)
.<DataRecord, ProcessedRecord>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Активує відмовостійкий режим
.faultTolerant()
// SKIP: ігнорує до 10 помилок валідації
.skipLimit(10)
.skip(ValidationException.class)
.skip(DataIntegrityViolationException.class)
// Деякі помилки не можна пропускати
.noSkip(FatalBatchException.class)
// RETRY: повторює тимчасові помилки
.retryLimit(3)
.retry(TransientDataAccessException.class)
.retry(DeadlockLoserDataAccessException.class)
// Експоненційний backoff між повторами
.backOffPolicy(exponentialBackOffPolicy())
// Listener для логування пропусків
.listener(skipListener())
.build();
}
@Bean
public BackOffPolicy exponentialBackOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(1000); // 1 секунда
policy.setMultiplier(2.0); // Подвоюється на кожному повторі
policy.setMaxInterval(10000); // Максимум 10 секунд
return policy;
}
@Bean
public SkipListener<DataRecord, ProcessedRecord> skipListener() {
return new SkipListener<>() {
@Override
public void onSkipInRead(Throwable t) {
// Лог нечитабельного елемента
}
@Override
public void onSkipInProcess(DataRecord item, Throwable t) {
// Лог елемента, що зазнав помилки в обробці
}
@Override
public void onSkipInWrite(ProcessedRecord item, Throwable t) {
// Лог елемента, що зазнав помилки в записі
}
};
}
}Retry підходить для тимчасових помилок (мережевий таймаут, deadlock БД). Skip підходить для індивідуальних помилок даних, що не повинні блокувати загальну обробку.
Питання 8: Як реалізувати власну SkipPolicy?
Власна SkipPolicy дозволяє тонку логіку рішень: пропуск за типом винятку, кількістю помилок чи специфічними бізнес-критеріями.
// SkipPolicy з просунутою бізнес-логікою
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {
private static final int MAX_SKIP_COUNT = 100;
private static final double MAX_SKIP_PERCENTAGE = 0.05; // 5% максимум
private final AtomicInteger totalProcessed = new AtomicInteger(0);
private final AtomicInteger skipCount = new AtomicInteger(0);
@Override
public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
// Ніколи не пропускати фатальні помилки
if (exception instanceof FatalBatchException
|| exception instanceof OutOfMemoryError) {
return false;
}
// Абсолютний ліміт пропусків
if (skipCountSoFar >= MAX_SKIP_COUNT) {
return false; // Зупинити задачу
}
// Відсотковий ліміт
int total = totalProcessed.get();
if (total > 1000) { // Застосовувати лише після прогріву
double skipPercentage = (double) skipCountSoFar / total;
if (skipPercentage > MAX_SKIP_PERCENTAGE) {
return false; // Забагато помилок пропорційно
}
}
// Пропускати помилки валідації та даних
return exception instanceof ValidationException
|| exception instanceof DataFormatException
|| exception instanceof IllegalArgumentException;
}
// Викликається listener-ом для відстеження прогресу
public void incrementProcessed() {
totalProcessed.incrementAndGet();
}
}Питання 9: Як працює рестарт невдалої задачі?
JobRepository зберігає стан кожного виконання. При рестарті Spring Batch визначає останній закомічений чанк і відновлює з цієї точки. Успішно оброблені елементи не обробляються повторно.
// Сервіс керування рестартом задач
@Service
public class JobRestartService {
private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
private final Job importJob;
public JobRestartService(JobLauncher jobLauncher,
JobExplorer jobExplorer,
JobRepository jobRepository,
@Qualifier("importJob") Job importJob) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
this.importJob = importJob;
}
public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
// Дістає невдале виконання
JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);
if (failedExecution == null) {
throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
}
// Перевіряє, що задачу можна перезапустити
if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
throw new IllegalStateException("Only FAILED jobs can be restarted");
}
// Використовує ті ж параметри, що й оригінальне виконання
JobParameters originalParams = failedExecution.getJobParameters();
// Перезапускає задачу - автоматично продовжує з останньої точки
return jobLauncher.run(importJob, originalParams);
}
public List<JobExecution> findRestartableJobs() {
// Перелік усіх FAILED виконань, що ще не перезапущені
return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.FAILED)
.filter(this::isRestartable)
.toList();
}
private boolean isRestartable(JobExecution execution) {
// Перевіряє, що немає новішого успішного виконання
JobInstance instance = execution.getJobInstance();
return jobExplorer.getJobExecutions(instance).stream()
.noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
}
}Задачу можна перезапустити лише, якщо JobParameters ідентичні. Зміна параметра створює новий екземпляр задачі та втрачає історію прогресу.
Масштабування й оптимізація
Питання 10: Які стратегії масштабування доступні?
Spring Batch пропонує чотири стратегії: multi-threaded step (кілька потоків читають паралельно), parallel steps (незалежні step паралельно), remote chunking (розподілена обробка) і partitioning (розподілені дані).
// Multi-threaded step: кілька потоків обробляють той самий датасет
@Configuration
public class MultiThreadedStepConfig {
@Bean
public Step multiThreadedStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<Record> reader,
ItemProcessor<Record, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
TaskExecutor taskExecutor) {
return new StepBuilder("multiThreadedStep", jobRepository)
.<Record, ProcessedRecord>chunk(100, transactionManager)
// УВАГА: reader має бути thread-safe
.reader(synchronizedReader(reader))
.processor(processor)
.writer(writer)
// 4 потоки обробляють чанки паралельно
.taskExecutor(taskExecutor)
.throttleLimit(4)
.build();
}
// Wrapper, щоб зробити reader thread-safe
private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
syncReader.setDelegate((ItemStreamReader<Record>) reader);
return syncReader;
}
}// Виконання незалежних step паралельно
@Configuration
public class ParallelStepsConfig {
@Bean
public Job parallelJob(JobRepository jobRepository,
Step loadCustomersStep,
Step loadProductsStep,
Step loadOrdersStep,
Step processDataStep) {
// Паралельний потік: customers і products завантажуються одночасно
Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
.start(loadCustomersStep)
.build();
Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
.start(loadProductsStep)
.build();
Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
.start(loadOrdersStep)
.build();
// Split виконує потоки паралельно
return new JobBuilder("parallelJob", jobRepository)
.start(new FlowBuilder<Flow>("parallelLoadFlow")
.split(new SimpleAsyncTaskExecutor())
.add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
.build())
// Після паралельного завантаження - послідовна обробка
.next(processDataStep)
.build()
.build();
}
}Multi-threading підходить, коли reader можна синхронізувати. Партиціювання краще для великих обсягів, оскільки кожна партиція має власний reader без конкуренції.
Питання 11: Як моніторити продуктивність задачі?
Spring Batch виставляє метрики через listener і JobRepository. Інтеграція з Micrometer дозволяє експорт у Prometheus, Grafana або інші системи моніторингу.
// Конфігурація моніторингу з Micrometer
@Configuration
public class BatchMetricsConfig {
private final MeterRegistry meterRegistry;
public BatchMetricsConfig(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Bean
public JobExecutionListener metricsJobListener() {
return new JobExecutionListener() {
private Timer.Sample jobTimer;
@Override
public void beforeJob(JobExecution jobExecution) {
// Запускає таймер тривалості задачі
jobTimer = Timer.start(meterRegistry);
Counter.builder("batch.job.started")
.tag("job", jobExecution.getJobInstance().getJobName())
.register(meterRegistry)
.increment();
}
@Override
public void afterJob(JobExecution jobExecution) {
// Реєструє загальну тривалість
jobTimer.stop(Timer.builder("batch.job.duration")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry));
// Лічильник задач за статусом
Counter.builder("batch.job.completed")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry)
.increment();
}
};
}
@Bean
public StepExecutionListener metricsStepListener() {
return new StepExecutionListener() {
@Override
public void afterStep(StepExecution stepExecution) {
String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
String stepName = stepExecution.getStepName();
// Метрики throughput
Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
return null;
}
};
}
}Питання 12: Які типові пастки партиціювання?
Часті помилки: незбалансовані партиції (одна партиція містить 90% даних), не thread-safe reader і неправильне керування станом між партиціями.
// Partitioner, що реально балансує навантаження
@Component
public class BalancedPartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Підраховує загальну кількість елементів для обробки
Integer totalCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);
if (totalCount == null || totalCount == 0) {
return Map.of();
}
// Обчислює цільовий розмір партиції
int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);
Map<String, ExecutionContext> partitions = new HashMap<>();
// Використовує OFFSET/LIMIT для збалансованих партицій
// Дорожче за діапазони, але гарантує баланс
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("offset", i * itemsPerPartition);
context.putInt("limit", itemsPerPartition);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}
// OffsetBasedReader.java
// Reader, сумісний з offset-партиціюванням
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {
private final JdbcTemplate jdbcTemplate;
private Iterator<OrderRecord> iterator;
@Value("#{stepExecutionContext['offset']}")
private int offset;
@Value("#{stepExecutionContext['limit']}")
private int limit;
public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void open(ExecutionContext executionContext) {
// Завантажує саме ту частину, яку призначено цій партиції
List<OrderRecord> records = jdbcTemplate.query(
"SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
new OrderRecordRowMapper(),
limit, offset
);
this.iterator = records.iterator();
}
@Override
public OrderRecord read() {
return iterator.hasNext() ? iterator.next() : null;
}
@Override
public void update(ExecutionContext executionContext) {
// Збереження стану для рестарту, якщо потрібно
}
@Override
public void close() {
// Очищення
}
}Просунуті питання для senior
Питання 13: Як керувати залежностями між задачами?
Spring Batch не керує залежностями між задачами вбудовано. Рішення: зовнішні оркестратори (Airflow, Kubernetes CronJob) або власна реалізація з JobExplorer.
// Керування залежностями між задачами
@Service
public class JobDependencyService {
private final JobExplorer jobExplorer;
private final JobLauncher jobLauncher;
private final Map<String, Job> jobs;
public JobDependencyService(JobExplorer jobExplorer,
JobLauncher jobLauncher,
Map<String, Job> jobs) {
this.jobExplorer = jobExplorer;
this.jobLauncher = jobLauncher;
this.jobs = jobs;
}
public JobExecution runWithDependencies(String jobName,
JobParameters params,
List<String> dependsOn) throws Exception {
// Перевіряє, що всі залежності завершилися успішно
for (String dependency : dependsOn) {
if (!hasSuccessfulExecution(dependency, params)) {
throw new JobExecutionException(
"Dependency not satisfied: " + dependency);
}
}
Job job = jobs.get(jobName);
if (job == null) {
throw new IllegalArgumentException("Unknown job: " + jobName);
}
return jobLauncher.run(job, params);
}
private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
// Шукає COMPLETED виконання з тими ж бізнес-параметрами
return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
.anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
}
private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
// Порівнює бізнес-параметри (ігнорує таймстемпи виконання)
String actualDate = actual.getString("businessDate");
String expectedDate = expected.getString("businessDate");
return Objects.equals(actualDate, expectedDate);
}
}Питання 14: Як ефективно тестувати задачу Spring Batch?
Тестування Spring Batch вимагає шарувативного підходу: юніт-тести компонентів (reader, processor, writer), інтеграційні тести step і end-to-end тести повних задач.
// Юніт-тест processor
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {
@Mock
private PricingService pricingService;
@Mock
private ValidationService validationService;
@InjectMocks
private OrderItemProcessor processor;
@Test
void shouldProcessValidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(true);
when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));
// When
ProcessedOrder result = processor.process(input);
// Then
assertThat(result).isNotNull();
assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
}
@Test
void shouldFilterInvalidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(false);
// When
ProcessedOrder result = processor.process(input);
// Then - null означає відфільтровано
assertThat(result).isNull();
verify(pricingService, never()).calculatePrice(any());
}
}// Інтеграційний тест повної задачі
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JobRepositoryTestUtils jobRepositoryTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@BeforeEach
void setup() {
// Очищає метадані між тестами
jobRepositoryTestUtils.removeJobExecutions();
// Скидає тестові дані
jdbcTemplate.execute("DELETE FROM processed_orders");
jdbcTemplate.execute("DELETE FROM orders");
}
@Test
void shouldCompleteJobSuccessfully() throws Exception {
// Given - тестові дані
insertTestOrders(100);
// When
JobParameters params = new JobParametersBuilder()
.addLocalDate("businessDate", LocalDate.now())
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncherTestUtils.launchJob(params);
// Then
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
assertThat(countProcessedOrders()).isEqualTo(100);
}
@Test
void shouldHandleEmptyDataset() throws Exception {
// Given - немає даних
// When
JobExecution execution = jobLauncherTestUtils.launchJob();
// Then - задача успішна навіть без даних
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
@Test
void shouldRestartFromFailurePoint() throws Exception {
// Given - симулює помилку посеред обробки
insertTestOrders(100);
insertPoisonOrder(50); // Спричиняє помилку
// When - перше виконання провалюється
JobExecution firstExecution = jobLauncherTestUtils.launchJob();
assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);
// Виправити дані
removePoisonOrder(50);
// When - рестарт
JobExecution restartExecution = jobLauncherTestUtils.launchJob(
firstExecution.getJobParameters());
// Then - відновлюється з точки збою
assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
private void insertTestOrders(int count) {
for (int i = 1; i <= count; i++) {
jdbcTemplate.update(
"INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
i, i * 10, BigDecimal.valueOf(i * 10));
}
}
private int countProcessedOrders() {
return jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM processed_orders", Integer.class);
}
}Питання 15: Як оптимізувати продуктивність запису в БД?
Запис часто стає вузьким місцем. Оптимізації: JDBC batch insert, вимкнення обмежень під час завантаження та використання staging-таблиць.
// Writer, оптимізований для великих обсягів
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
private final DataSource dataSource;
public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
this.jdbcTemplate = jdbcTemplate;
this.dataSource = dataSource;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
List<? extends ProcessedOrder> items = chunk.getItems();
if (items.isEmpty()) {
return;
}
// Використовує PreparedStatement з batch
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
"VALUES (?, ?, ?, ?)")) {
for (ProcessedOrder order : items) {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
ps.addBatch();
}
// Виконує всі insert у одній мережевій операції
ps.executeBatch();
}
}
}
// StagingTableWriter.java
// Патерн staging-таблиці для дуже великих обсягів
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {
private final JdbcTemplate jdbcTemplate;
private String stagingTable;
public StagingTableWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void beforeStep(StepExecution stepExecution) {
// Створює тимчасову таблицю для цього step
stagingTable = "staging_orders_" + stepExecution.getId();
jdbcTemplate.execute(
"CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// Пише в staging-таблицю (без обмежень FK)
String sql = "INSERT INTO " + stagingTable +
" (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";
jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
});
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
// Bulk copy у фінальну таблицю
jdbcTemplate.execute(
"INSERT INTO processed_orders SELECT * FROM " + stagingTable);
}
// Очищає staging-таблицю
jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
return stepExecution.getExitStatus();
}
}Висновок
Володіння Spring Batch 5 на технічних співбесідах базується на глибокому розумінні внутрішніх механізмів:
✅ Архітектура: Job → Step → Chunk (Reader, Processor, Writer)
✅ Чанк-обробка: розмір, життєвий цикл, транзакції
✅ Партиціювання: локальне vs віддалене, балансування партицій
✅ Відмовостійкість: skip, retry, restart з відповідною політикою
✅ Масштабування: multi-threading, parallel steps, remote chunking
✅ Тести: юніт, інтеграція, end-to-end
✅ Оптимізація: batch writes, staging-таблиці, моніторинг
Просунуті питання оцінюють здатність обґрунтовувати архітектурні рішення відповідно до контексту: обсяг даних, часові обмеження, толерантність до помилок і доступна інфраструктура.
Починай практикувати!
Перевір свої знання з нашими симуляторами співбесід та технічними тестами.
Теги
Поділитися
Пов'язані статті

Spring Modulith: Архітектура модульного моноліта
Опануйте Spring Modulith для побудови модульних монолітів на Java. Архітектура, модулі, асинхронні події та тестування зі Spring Boot 3.

Співбесіда Spring Boot: Поширення Транзакцій
Опануйте поширення транзакцій у Spring Boot: REQUIRED, REQUIRES_NEW, NESTED тощо. 12 питань зі співбесід з кодом та поширеними пастками.

Spring Security 6: Повна Автентифікація JWT
Практичний посібник із впровадження автентифікації JWT у Spring Security 6: конфігурація, генерація токенів, валідація та найкращі практики безпеки.