Rozmowa Spring Batch 5: Partycjonowanie, Chunki i Tolerancja Błędów
Opanuj rozmowy o pracę ze Spring Batch 5: 15 kluczowych pytań o partycjonowanie, przetwarzanie chunkowe i tolerancję błędów z przykładami w Java 21.

Spring Batch 5 stanowi filar przetwarzania dużych zbiorów danych w ekosystemie Spring. Rozmowy techniczne oceniają umiejętność projektowania solidnych, skalowalnych i odpornych na błędy zadań. Opanowanie partycjonowania, przetwarzania chunkowego i mechanizmów tolerancji błędów wyróżnia starszych programistów.
Rekruterzy testują dogłębne zrozumienie: dlaczego wybrać partycjonowanie zamiast remote chunking? Jak prawidłowo dobrać rozmiar chunków? Te decyzje architektoniczne ujawniają realne doświadczenie produkcyjne.
Architektura fundamentalna Spring Batch 5
Pytanie 1: Jakie są główne komponenty Spring Batch?
Architektura Spring Batch opiera się na trzech warstwach: aplikacji (zadania i kod biznesowy), Batch Core (klasy runtime do uruchamiania i kontrolowania zadań) oraz infrastrukturze (wspólne readery, writery i usługi jak RetryTemplate).
// Konfiguracja zadania Spring Batch 5 z Java 21
@Configuration
public class BatchJobConfig {
// JobRepository przechowuje metadane wykonania
// Umożliwia restart i monitoring zadań
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public BatchJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Job enkapsuluje kompletny proces batch
// Składa się z jednego lub więcej Stepów wykonywanych sekwencyjnie
@Bean
public Job importUserJob(Step processUsersStep, Step cleanupStep) {
return new JobBuilder("importUserJob", jobRepository)
.start(processUsersStep) // Główny step przetwarzania
.next(cleanupStep) // Step czyszczący
.build();
}
// Step reprezentuje niezależną jednostkę pracy
// Dwa modele: Tasklet (pojedyncze zadanie) lub Chunk (przetwarzanie iteracyjne)
@Bean
public Step processUsersStep(ItemReader<UserRecord> reader,
ItemProcessor<UserRecord, User> processor,
ItemWriter<User> writer) {
return new StepBuilder("processUsersStep", jobRepository)
.<UserRecord, User>chunk(100, transactionManager) // Commit co 100 elementów
.reader(reader) // Czyta dane źródłowe
.processor(processor) // Transformuje każdy element
.writer(writer) // Zapisuje partiami po 100
.build();
}
}JobRepository utrwala stan wykonań w bazie danych. Ta trwałość pozwala wznowić nieudane zadanie dokładnie tam, gdzie się zatrzymało, bez ponownego przetwarzania zatwierdzonych danych.
Pytanie 2: Jaka jest różnica między Taskletem a przetwarzaniem chunkowym?
Tasklet wykonuje pojedynczą, nieiteracyjną akcję: usunięcie pliku, wywołanie procedury składowanej, wysłanie e-maila. Chunk przetwarza duże wolumeny, dzieląc dane na zarządzalne partie.
// Tasklet: pojedyncza akcja bez iteracji
@Component
public class CleanupTasklet implements Tasklet {
private final Path tempDirectory = Path.of("/tmp/batch-work");
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// Usuwa wszystkie pliki tymczasowe z przetwarzania
try (var files = Files.walk(tempDirectory)) {
files.filter(Files::isRegularFile)
.forEach(this::deleteQuietly);
}
// FINISHED oznacza, że tasklet ukończył pracę
// CONTINUABLE wznowiłby wykonanie (przydatne dla pollingu)
return RepeatStatus.FINISHED;
}
private void deleteQuietly(Path file) {
try {
Files.delete(file);
} catch (IOException e) {
// Loguj i kontynuuj - nie przerywaj zadania z powodu jednego pliku
}
}
}// Przetwarzanie chunkowe: obsługa dużych wolumenów
@Configuration
public class ChunkProcessingConfig {
@Bean
public Step processOrdersStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("processOrdersStep", jobRepository)
// Chunk 500: czyta 500 elementów, przetwarza, zapisuje, commit
.<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Listener do monitorowania postępu
.listener(new ChunkProgressListener())
.build();
}
}Przetwarzanie chunkowe oferuje krytyczne korzyści: zoptymalizowane zarządzanie pamięcią (tylko bieżący chunk w pamięci), granularne transakcje (commit per chunk) i odzyskiwanie po awarii w ostatnim zatwierdzonym chunku.
Pogłębienie przetwarzania chunkowego
Pytanie 3: Jak działa cykl życia chunka?
Każdy chunk podąża dokładnym cyklem: czytanie elementów po jednym do osiągnięcia skonfigurowanego rozmiaru, indywidualne przetwarzanie każdego elementu, następnie zapis grupy. Transakcja obejmuje cały chunk.
// ItemReader: czyta jeden element na raz
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {
// @StepScope: nowa instancja dla każdego wykonania stepu
// Pozwala wstrzyknąć dynamiczne parametry zadania
@Value("#{jobParameters['startDate']}")
private LocalDate startDate;
private Iterator<OrderRecord> orderIterator;
@BeforeStep
public void initializeReader(StepExecution stepExecution) {
// Ładuje dane przy starcie stepu
List<OrderRecord> orders = fetchOrdersFromDate(startDate);
this.orderIterator = orders.iterator();
}
@Override
public OrderRecord read() {
// Zwraca null, aby zasygnalizować koniec danych
// Spring Batch wywołuje read() do otrzymania null
if (orderIterator.hasNext()) {
return orderIterator.next();
}
return null; // Koniec datasetu
}
private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
// Pobiera ze źródła danych
return List.of(); // Faktyczna implementacja
}
}// ItemProcessor: transformuje każdy element indywidualnie
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {
private final PricingService pricingService;
private final ValidationService validationService;
public OrderItemProcessor(PricingService pricingService,
ValidationService validationService) {
this.pricingService = pricingService;
this.validationService = validationService;
}
@Override
public ProcessedOrder process(OrderRecord item) {
// Zwrócenie null filtruje element (nie zostanie zapisany)
if (!validationService.isValid(item)) {
return null; // Element odfiltrowany
}
// Transformacja biznesowa
BigDecimal finalPrice = pricingService.calculatePrice(item);
return new ProcessedOrder(
item.orderId(),
item.customerId(),
finalPrice,
LocalDateTime.now()
);
}
}// ItemWriter: zapisuje cały chunk w jednej operacji
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
public OrderItemWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// Chunk zawiera wszystkie przetworzone elementy
// Zapis batch dla zoptymalizowanej wydajności
List<? extends ProcessedOrder> items = chunk.getItems();
jdbcTemplate.batchUpdate(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
items,
items.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
}
);
}
}Jeśli podczas przetwarzania chunka wystąpi wyjątek, transakcja jest wycofywana. Zadanie może następnie wznowić od tego chunka, używając metadanych przechowywanych w JobRepository.
Pytanie 4: Jak wybrać optymalny rozmiar chunka?
Rozmiar chunka bezpośrednio wpływa na wydajność i zużycie pamięci. Zbyt mały chunk mnoży commity (overhead). Zbyt duży chunk zużywa nadmiernie pamięć i wydłuża rollbacki przy awarii.
// Dynamiczna konfiguracja rozmiaru chunka
@Configuration
public class ChunkSizingConfig {
// Rozsądna wartość domyślna dla większości przypadków
private static final int DEFAULT_CHUNK_SIZE = 100;
// Dla lekkich elementów (mało pól)
private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;
// Dla ciężkich elementów (bloby, dokumenty)
private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;
@Bean
public Step processLightDataStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<LightRecord> reader,
ItemWriter<LightRecord> writer) {
return new StepBuilder("processLightDataStep", jobRepository)
// Lekkie elementy: większe chunki dla mniejszej liczby commitów
.<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Step processDocumentsStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<Document> reader,
ItemProcessor<Document, ProcessedDocument> processor,
ItemWriter<ProcessedDocument> writer) {
return new StepBuilder("processDocumentsStep", jobRepository)
// Ciężkie dokumenty: mniejsze chunki dla ograniczenia pamięci
.<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}Zacząć od 100 elementów na chunk, potem dostosować na podstawie metryk: czas commit, użycie pamięci i czas trwania rollbacku. Użyj listenerów, aby zidentyfikować optymalną wartość.
Partycjonowanie dla przetwarzania równoległego
Pytanie 5: Czym jest partycjonowanie i kiedy je stosować?
Partycjonowanie dzieli dataset na niezależne partycje przetwarzane równolegle. Każda partycja działa w swoim wątku (lokalnie) lub na zdalnym workerze. To podejście mnoży przepustowość bez utraty możliwości restartu.
// Konfiguracja zadania partycjonowanego
@Configuration
public class PartitionedJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public PartitionedJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
@Bean
public Job partitionedImportJob(Step partitionedStep) {
return new JobBuilder("partitionedImportJob", jobRepository)
.start(partitionedStep)
.build();
}
// Step manager: orkiestruje partycje
@Bean
public Step partitionedStep(Partitioner partitioner,
Step workerStep,
TaskExecutor taskExecutor) {
return new StepBuilder("partitionedStep", jobRepository)
// Dzieli pracę poprzez Partitioner
.partitioner("workerStep", partitioner)
// Step wykonywany dla każdej partycji
.step(workerStep)
// 8 równoległych wątków
.taskExecutor(taskExecutor)
// Liczba partycji do utworzenia
.gridSize(8)
.build();
}
// TaskExecutor dla wykonania równoległego
@Bean
public TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("batch-partition-");
executor.initialize();
return executor;
}
}// Partitioner oparty na zakresach ID
@Component
public class RangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public RangePartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Pobiera granice datasetu
Long minId = jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
Long maxId = jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);
if (minId == null || maxId == null) {
return Map.of(); // Brak danych do przetworzenia
}
// Oblicza rozmiar każdej partycji
long range = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
long start = minId + (i * range);
long end = Math.min(start + range - 1, maxId);
// Każda partycja otrzymuje swoje granice
context.putLong("minId", start);
context.putLong("maxId", end);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}Partycjonowanie jest odpowiednie dla dużych datasetów, gdzie elementy są niezależne. Partycje muszą być zbalansowane, aby zapobiec spowolnieniu całego zadania przez wolną partycję.
Pytanie 6: Jaka jest różnica między partycjonowaniem lokalnym a zdalnym?
Partycjonowanie lokalne wykonuje wszystkie partycje w tej samej JVM z thread poolem. Partycjonowanie zdalne rozdziela partycje między wiele JVM (workery) poprzez middleware komunikatów.
// Konfiguracja partycjonowania zdalnego z komunikatami
@Configuration
public class RemotePartitioningConfig {
@Bean
public Step managerStep(JobRepository jobRepository,
Partitioner partitioner,
MessageChannelPartitionHandler partitionHandler) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", partitioner)
// Handler komunikujący się ze zdalnymi workerami
.partitionHandler(partitionHandler)
.build();
}
// PartitionHandler wysyła ExecutionContexty do workerów
@Bean
public MessageChannelPartitionHandler partitionHandler(
MessagingTemplate messagingTemplate,
JobExplorer jobExplorer) {
MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
handler.setStepName("workerStep");
handler.setGridSize(4);
handler.setMessagingOperations(messagingTemplate);
handler.setJobExplorer(jobExplorer);
// Timeout oczekiwania na zakończenie workerów
handler.setPollInterval(5000L);
return handler;
}
}// Konfiguracja po stronie workera
@Configuration
public class WorkerConfiguration {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public WorkerConfiguration(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Worker odbiera partycje i wykonuje step
@Bean
public Step workerStep(ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("workerStep", jobRepository)
.<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
// Reader z @StepScope dla parametrów partycji
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
// Reader używający granic partycji
@Bean
@StepScope
public JdbcCursorItemReader<OrderRecord> partitionedReader(
DataSource dataSource,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
return new JdbcCursorItemReaderBuilder<OrderRecord>()
.name("partitionedOrderReader")
.dataSource(dataSource)
.sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
.preparedStatementSetter(ps -> {
ps.setLong(1, minId);
ps.setLong(2, maxId);
})
.rowMapper(new OrderRecordRowMapper())
.build();
}
}Gotowy na rozmowy o Spring Boot?
Ćwicz z naszymi interaktywnymi symulatorami, flashcards i testami technicznymi.
Tolerancja błędów i odzyskiwanie
Pytanie 7: Jakie mechanizmy tolerancji błędów oferuje Spring Batch?
Spring Batch oferuje trzy uzupełniające się mechanizmy: skip (ignorowanie elementów z błędami), retry (automatyczne ponawianie) i restart (wznowienie nieudanego zadania). Mechanizmy te konfiguruje się na poziomie stepu.
// Pełna konfiguracja tolerancji błędów
@Configuration
public class FaultTolerantStepConfig {
@Bean
public Step faultTolerantStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<DataRecord> reader,
ItemProcessor<DataRecord, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
SkipPolicy customSkipPolicy) {
return new StepBuilder("faultTolerantStep", jobRepository)
.<DataRecord, ProcessedRecord>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Aktywuje tryb fault tolerant
.faultTolerant()
// SKIP: ignoruje do 10 błędów walidacji
.skipLimit(10)
.skip(ValidationException.class)
.skip(DataIntegrityViolationException.class)
// Niektóre błędy nigdy nie powinny być pomijane
.noSkip(FatalBatchException.class)
// RETRY: ponawia błędy przejściowe
.retryLimit(3)
.retry(TransientDataAccessException.class)
.retry(DeadlockLoserDataAccessException.class)
// Wykładniczy backoff między ponowieniami
.backOffPolicy(exponentialBackOffPolicy())
// Listener do logowania pominięć
.listener(skipListener())
.build();
}
@Bean
public BackOffPolicy exponentialBackOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(1000); // 1 sekunda
policy.setMultiplier(2.0); // Podwaja się przy każdym ponowieniu
policy.setMaxInterval(10000); // Maksymalnie 10 sekund
return policy;
}
@Bean
public SkipListener<DataRecord, ProcessedRecord> skipListener() {
return new SkipListener<>() {
@Override
public void onSkipInRead(Throwable t) {
// Loguj nieczytelny element
}
@Override
public void onSkipInProcess(DataRecord item, Throwable t) {
// Loguj element, który nie powiódł się przy przetwarzaniu
}
@Override
public void onSkipInWrite(ProcessedRecord item, Throwable t) {
// Loguj element, który nie powiódł się przy zapisie
}
};
}
}Retry pasuje do błędów przejściowych (timeout sieci, deadlock bazy). Skip pasuje do błędów indywidualnych danych, które nie powinny blokować ogólnego przetwarzania.
Pytanie 8: Jak zaimplementować własny SkipPolicy?
Własny SkipPolicy umożliwia szczegółową logikę decyzyjną: pomijanie według typu wyjątku, liczby błędów lub specyficznych kryteriów biznesowych.
// SkipPolicy z zaawansowaną logiką biznesową
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {
private static final int MAX_SKIP_COUNT = 100;
private static final double MAX_SKIP_PERCENTAGE = 0.05; // 5% maksimum
private final AtomicInteger totalProcessed = new AtomicInteger(0);
private final AtomicInteger skipCount = new AtomicInteger(0);
@Override
public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
// Nigdy nie pomijać błędów krytycznych
if (exception instanceof FatalBatchException
|| exception instanceof OutOfMemoryError) {
return false;
}
// Bezwzględny limit pominięć
if (skipCountSoFar >= MAX_SKIP_COUNT) {
return false; // Zatrzymuje zadanie
}
// Limit procentowy
int total = totalProcessed.get();
if (total > 1000) { // Stosować dopiero po rozgrzewce
double skipPercentage = (double) skipCountSoFar / total;
if (skipPercentage > MAX_SKIP_PERCENTAGE) {
return false; // Zbyt wiele błędów proporcjonalnie
}
}
// Pomijaj błędy walidacji i danych
return exception instanceof ValidationException
|| exception instanceof DataFormatException
|| exception instanceof IllegalArgumentException;
}
// Wywoływane przez listener do śledzenia postępu
public void incrementProcessed() {
totalProcessed.incrementAndGet();
}
}Pytanie 9: Jak działa restart nieudanego zadania?
JobRepository przechowuje stan każdego wykonania. Przy restarcie Spring Batch identyfikuje ostatni zatwierdzony chunk i wznawia od tego punktu. Pomyślnie przetworzone elementy nie są ponownie przetwarzane.
// Serwis zarządzania restartami zadań
@Service
public class JobRestartService {
private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
private final Job importJob;
public JobRestartService(JobLauncher jobLauncher,
JobExplorer jobExplorer,
JobRepository jobRepository,
@Qualifier("importJob") Job importJob) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
this.importJob = importJob;
}
public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
// Pobiera nieudane wykonanie
JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);
if (failedExecution == null) {
throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
}
// Sprawdza, czy zadanie może być zrestartowane
if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
throw new IllegalStateException("Only FAILED jobs can be restarted");
}
// Używa tych samych parametrów co oryginalne wykonanie
JobParameters originalParams = failedExecution.getJobParameters();
// Uruchamia ponownie zadanie - automatycznie wznawia od ostatniego checkpointu
return jobLauncher.run(importJob, originalParams);
}
public List<JobExecution> findRestartableJobs() {
// Listuje wszystkie wykonania FAILED jeszcze nie zrestartowane
return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.FAILED)
.filter(this::isRestartable)
.toList();
}
private boolean isRestartable(JobExecution execution) {
// Sprawdza, że nie istnieje nowsze udane wykonanie
JobInstance instance = execution.getJobInstance();
return jobExplorer.getJobExecutions(instance).stream()
.noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
}
}Zadanie można zrestartować tylko, jeśli JobParameters są identyczne. Modyfikacja parametru tworzy nową instancję zadania, tracąc historię postępu.
Skalowanie i optymalizacja
Pytanie 10: Jakie strategie skalowania są dostępne?
Spring Batch oferuje cztery strategie: multi-threaded step (wiele wątków czyta równolegle), parallel steps (niezależne stepy równolegle), remote chunking (przetwarzanie rozproszone) i partitioning (rozproszone dane).
// Step multi-threaded: wiele wątków przetwarza ten sam dataset
@Configuration
public class MultiThreadedStepConfig {
@Bean
public Step multiThreadedStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<Record> reader,
ItemProcessor<Record, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
TaskExecutor taskExecutor) {
return new StepBuilder("multiThreadedStep", jobRepository)
.<Record, ProcessedRecord>chunk(100, transactionManager)
// UWAGA: reader musi być thread-safe
.reader(synchronizedReader(reader))
.processor(processor)
.writer(writer)
// 4 wątki przetwarzają chunki równolegle
.taskExecutor(taskExecutor)
.throttleLimit(4)
.build();
}
// Wrapper, aby uczynić reader thread-safe
private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
syncReader.setDelegate((ItemStreamReader<Record>) reader);
return syncReader;
}
}// Wykonywanie niezależnych stepów równolegle
@Configuration
public class ParallelStepsConfig {
@Bean
public Job parallelJob(JobRepository jobRepository,
Step loadCustomersStep,
Step loadProductsStep,
Step loadOrdersStep,
Step processDataStep) {
// Równoległy flow: customers i products ładowane jednocześnie
Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
.start(loadCustomersStep)
.build();
Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
.start(loadProductsStep)
.build();
Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
.start(loadOrdersStep)
.build();
// Split wykonuje flowy równolegle
return new JobBuilder("parallelJob", jobRepository)
.start(new FlowBuilder<Flow>("parallelLoadFlow")
.split(new SimpleAsyncTaskExecutor())
.add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
.build())
// Po równoległym ładowaniu, sekwencyjne przetwarzanie
.next(processDataStep)
.build()
.build();
}
}Multi-threading pasuje do przypadków, gdy reader można zsynchronizować. Partycjonowanie jest preferowane dla dużych wolumenów, ponieważ każda partycja ma własny reader bez rywalizacji.
Pytanie 11: Jak monitorować wydajność zadania?
Spring Batch udostępnia metryki przez listenery i JobRepository. Integracja z Micrometer umożliwia eksport do Prometheusa, Grafany lub innych systemów monitorowania.
// Konfiguracja monitorowania z Micrometer
@Configuration
public class BatchMetricsConfig {
private final MeterRegistry meterRegistry;
public BatchMetricsConfig(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Bean
public JobExecutionListener metricsJobListener() {
return new JobExecutionListener() {
private Timer.Sample jobTimer;
@Override
public void beforeJob(JobExecution jobExecution) {
// Uruchamia timer trwania zadania
jobTimer = Timer.start(meterRegistry);
Counter.builder("batch.job.started")
.tag("job", jobExecution.getJobInstance().getJobName())
.register(meterRegistry)
.increment();
}
@Override
public void afterJob(JobExecution jobExecution) {
// Rejestruje całkowity czas trwania
jobTimer.stop(Timer.builder("batch.job.duration")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry));
// Licznik zadań według statusu
Counter.builder("batch.job.completed")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry)
.increment();
}
};
}
@Bean
public StepExecutionListener metricsStepListener() {
return new StepExecutionListener() {
@Override
public void afterStep(StepExecution stepExecution) {
String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
String stepName = stepExecution.getStepName();
// Metryki przepustowości
Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
return null;
}
};
}
}Pytanie 12: Jakie są typowe pułapki partycjonowania?
Częste błędy obejmują: niezbalansowane partycje (jedna partycja zawiera 90% danych), readery bez bezpieczeństwa wątków i nieprawidłowe zarządzanie stanem między partycjami.
// Partitioner faktycznie równoważący obciążenie
@Component
public class BalancedPartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Zlicza całkowitą liczbę elementów do przetworzenia
Integer totalCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);
if (totalCount == null || totalCount == 0) {
return Map.of();
}
// Oblicza docelowy rozmiar partycji
int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);
Map<String, ExecutionContext> partitions = new HashMap<>();
// Używa OFFSET/LIMIT dla zbalansowanych partycji
// Droższe niż zakresy, ale gwarantuje balans
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("offset", i * itemsPerPartition);
context.putInt("limit", itemsPerPartition);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}
// OffsetBasedReader.java
// Reader kompatybilny z partycjonowaniem opartym na offset
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {
private final JdbcTemplate jdbcTemplate;
private Iterator<OrderRecord> iterator;
@Value("#{stepExecutionContext['offset']}")
private int offset;
@Value("#{stepExecutionContext['limit']}")
private int limit;
public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void open(ExecutionContext executionContext) {
// Ładuje dokładnie część przydzieloną tej partycji
List<OrderRecord> records = jdbcTemplate.query(
"SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
new OrderRecordRowMapper(),
limit, offset
);
this.iterator = records.iterator();
}
@Override
public OrderRecord read() {
return iterator.hasNext() ? iterator.next() : null;
}
@Override
public void update(ExecutionContext executionContext) {
// Zapisanie stanu dla restartu, jeśli potrzebne
}
@Override
public void close() {
// Sprzątanie
}
}Pytania zaawansowane dla seniorów
Pytanie 13: Jak zarządzać zależnościami między zadaniami?
Spring Batch nie zarządza natywnie zależnościami między zadaniami. Rozwiązania obejmują: zewnętrzne orkiestratory (Airflow, Kubernetes CronJob) lub niestandardową implementację z JobExplorer.
// Zarządzanie zależnościami między zadaniami
@Service
public class JobDependencyService {
private final JobExplorer jobExplorer;
private final JobLauncher jobLauncher;
private final Map<String, Job> jobs;
public JobDependencyService(JobExplorer jobExplorer,
JobLauncher jobLauncher,
Map<String, Job> jobs) {
this.jobExplorer = jobExplorer;
this.jobLauncher = jobLauncher;
this.jobs = jobs;
}
public JobExecution runWithDependencies(String jobName,
JobParameters params,
List<String> dependsOn) throws Exception {
// Sprawdza, czy wszystkie zależności zakończyły się sukcesem
for (String dependency : dependsOn) {
if (!hasSuccessfulExecution(dependency, params)) {
throw new JobExecutionException(
"Dependency not satisfied: " + dependency);
}
}
Job job = jobs.get(jobName);
if (job == null) {
throw new IllegalArgumentException("Unknown job: " + jobName);
}
return jobLauncher.run(job, params);
}
private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
// Szuka wykonania COMPLETED z tymi samymi parametrami biznesowymi
return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
.anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
}
private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
// Porównuje parametry biznesowe (ignoruje timestampy wykonania)
String actualDate = actual.getString("businessDate");
String expectedDate = expected.getString("businessDate");
return Objects.equals(actualDate, expectedDate);
}
}Pytanie 14: Jak skutecznie testować zadanie Spring Batch?
Testowanie zadań Spring Batch wymaga warstwowego podejścia: testy jednostkowe komponentów (reader, processor, writer), testy integracyjne stepów i testy end-to-end pełnych zadań.
// Test jednostkowy procesora
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {
@Mock
private PricingService pricingService;
@Mock
private ValidationService validationService;
@InjectMocks
private OrderItemProcessor processor;
@Test
void shouldProcessValidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(true);
when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));
// When
ProcessedOrder result = processor.process(input);
// Then
assertThat(result).isNotNull();
assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
}
@Test
void shouldFilterInvalidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(false);
// When
ProcessedOrder result = processor.process(input);
// Then - null oznacza odfiltrowane
assertThat(result).isNull();
verify(pricingService, never()).calculatePrice(any());
}
}// Test integracyjny pełnego zadania
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JobRepositoryTestUtils jobRepositoryTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@BeforeEach
void setup() {
// Czyści metadane między testami
jobRepositoryTestUtils.removeJobExecutions();
// Resetuje dane testowe
jdbcTemplate.execute("DELETE FROM processed_orders");
jdbcTemplate.execute("DELETE FROM orders");
}
@Test
void shouldCompleteJobSuccessfully() throws Exception {
// Given - dane testowe
insertTestOrders(100);
// When
JobParameters params = new JobParametersBuilder()
.addLocalDate("businessDate", LocalDate.now())
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncherTestUtils.launchJob(params);
// Then
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
assertThat(countProcessedOrders()).isEqualTo(100);
}
@Test
void shouldHandleEmptyDataset() throws Exception {
// Given - brak danych
// When
JobExecution execution = jobLauncherTestUtils.launchJob();
// Then - zadanie odnosi sukces nawet bez danych
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
@Test
void shouldRestartFromFailurePoint() throws Exception {
// Given - symuluje błąd w środku przetwarzania
insertTestOrders(100);
insertPoisonOrder(50); // Powoduje błąd
// When - pierwsze wykonanie kończy się niepowodzeniem
JobExecution firstExecution = jobLauncherTestUtils.launchJob();
assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);
// Naprawia dane
removePoisonOrder(50);
// When - restart
JobExecution restartExecution = jobLauncherTestUtils.launchJob(
firstExecution.getJobParameters());
// Then - wznawia od punktu błędu
assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
private void insertTestOrders(int count) {
for (int i = 1; i <= count; i++) {
jdbcTemplate.update(
"INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
i, i * 10, BigDecimal.valueOf(i * 10));
}
}
private int countProcessedOrders() {
return jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM processed_orders", Integer.class);
}
}Pytanie 15: Jak zoptymalizować wydajność zapisu do bazy danych?
Zapis często staje się wąskim gardłem. Optymalizacje obejmują: batch inserty JDBC, wyłączenie ograniczeń podczas ładowania i użycie tabel staging.
// Writer zoptymalizowany dla dużych wolumenów
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
private final DataSource dataSource;
public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
this.jdbcTemplate = jdbcTemplate;
this.dataSource = dataSource;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
List<? extends ProcessedOrder> items = chunk.getItems();
if (items.isEmpty()) {
return;
}
// Używa PreparedStatement z batch
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
"VALUES (?, ?, ?, ?)")) {
for (ProcessedOrder order : items) {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
ps.addBatch();
}
// Wykonuje wszystkie inserty w jednej operacji sieciowej
ps.executeBatch();
}
}
}
// StagingTableWriter.java
// Wzorzec staging table dla bardzo dużych wolumenów
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {
private final JdbcTemplate jdbcTemplate;
private String stagingTable;
public StagingTableWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void beforeStep(StepExecution stepExecution) {
// Tworzy tymczasową tabelę dla tego stepu
stagingTable = "staging_orders_" + stepExecution.getId();
jdbcTemplate.execute(
"CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// Zapisuje do tabeli staging (bez ograniczeń FK)
String sql = "INSERT INTO " + stagingTable +
" (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";
jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
});
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
// Bulk copy do tabeli docelowej
jdbcTemplate.execute(
"INSERT INTO processed_orders SELECT * FROM " + stagingTable);
}
// Czyści tabelę staging
jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
return stepExecution.getExitStatus();
}
}Podsumowanie
Opanowanie Spring Batch 5 na rozmowach technicznych opiera się na dogłębnym zrozumieniu mechanizmów wewnętrznych:
✅ Architektura: Job → Step → Chunk (Reader, Processor, Writer)
✅ Przetwarzanie chunkowe: dobór rozmiaru, cykl życia, transakcje
✅ Partycjonowanie: lokalne vs zdalne, równoważenie partycji
✅ Tolerancja błędów: skip, retry, restart z odpowiednią polityką
✅ Skalowanie: multi-threading, parallel steps, remote chunking
✅ Testy: jednostkowe, integracyjne, end-to-end
✅ Optymalizacja: batch writes, tabele staging, monitorowanie
Zaawansowane pytania oceniają umiejętność uzasadniania wyborów architektonicznych w zależności od kontekstu: wolumen danych, ograniczenia czasowe, tolerancja błędów i dostępna infrastruktura.
Zacznij ćwiczyć!
Sprawdź swoją wiedzę z naszymi symulatorami rozmów i testami technicznymi.
Tagi
Udostępnij
Powiązane artykuły

Spring Modulith: Architektura Modularnego Monolitu Wyjaśniona
Naucz się Spring Modulith do budowy modularnych monolitów w Javie. Architektura, moduły, eventy asynchroniczne i testy z przykładami Spring Boot 3.

Rozmowa Spring Boot: Propagacja Transakcji
Opanuj propagację transakcji w Spring Boot: REQUIRED, REQUIRES_NEW, NESTED i więcej. 12 pytań rekrutacyjnych z kodem i typowymi pułapkami.

Spring Security 6: Pełne uwierzytelnianie JWT
Praktyczny przewodnik wdrażania uwierzytelniania JWT w Spring Security 6: konfiguracja, generowanie tokenów, walidacja i najlepsze praktyki bezpieczeństwa.