Rozmowa Spring Batch 5: Partycjonowanie, Chunki i Tolerancja Błędów

Opanuj rozmowy o pracę ze Spring Batch 5: 15 kluczowych pytań o partycjonowanie, przetwarzanie chunkowe i tolerancję błędów z przykładami w Java 21.

Rozmowa Spring Batch 5: partycjonowanie, chunki i tolerancja błędów

Spring Batch 5 stanowi filar przetwarzania dużych zbiorów danych w ekosystemie Spring. Rozmowy techniczne oceniają umiejętność projektowania solidnych, skalowalnych i odpornych na błędy zadań. Opanowanie partycjonowania, przetwarzania chunkowego i mechanizmów tolerancji błędów wyróżnia starszych programistów.

Kluczowy punkt rozmowy

Rekruterzy testują dogłębne zrozumienie: dlaczego wybrać partycjonowanie zamiast remote chunking? Jak prawidłowo dobrać rozmiar chunków? Te decyzje architektoniczne ujawniają realne doświadczenie produkcyjne.

Architektura fundamentalna Spring Batch 5

Pytanie 1: Jakie są główne komponenty Spring Batch?

Architektura Spring Batch opiera się na trzech warstwach: aplikacji (zadania i kod biznesowy), Batch Core (klasy runtime do uruchamiania i kontrolowania zadań) oraz infrastrukturze (wspólne readery, writery i usługi jak RetryTemplate).

BatchJobConfig.javajava
// Konfiguracja zadania Spring Batch 5 z Java 21
@Configuration
public class BatchJobConfig {

    // JobRepository przechowuje metadane wykonania
    // Umożliwia restart i monitoring zadań
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public BatchJobConfig(JobRepository jobRepository,
                          PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    // Job enkapsuluje kompletny proces batch
    // Składa się z jednego lub więcej Stepów wykonywanych sekwencyjnie
    @Bean
    public Job importUserJob(Step processUsersStep, Step cleanupStep) {
        return new JobBuilder("importUserJob", jobRepository)
                .start(processUsersStep)      // Główny step przetwarzania
                .next(cleanupStep)             // Step czyszczący
                .build();
    }

    // Step reprezentuje niezależną jednostkę pracy
    // Dwa modele: Tasklet (pojedyncze zadanie) lub Chunk (przetwarzanie iteracyjne)
    @Bean
    public Step processUsersStep(ItemReader<UserRecord> reader,
                                  ItemProcessor<UserRecord, User> processor,
                                  ItemWriter<User> writer) {
        return new StepBuilder("processUsersStep", jobRepository)
                .<UserRecord, User>chunk(100, transactionManager)  // Commit co 100 elementów
                .reader(reader)       // Czyta dane źródłowe
                .processor(processor) // Transformuje każdy element
                .writer(writer)       // Zapisuje partiami po 100
                .build();
    }
}

JobRepository utrwala stan wykonań w bazie danych. Ta trwałość pozwala wznowić nieudane zadanie dokładnie tam, gdzie się zatrzymało, bez ponownego przetwarzania zatwierdzonych danych.

Pytanie 2: Jaka jest różnica między Taskletem a przetwarzaniem chunkowym?

Tasklet wykonuje pojedynczą, nieiteracyjną akcję: usunięcie pliku, wywołanie procedury składowanej, wysłanie e-maila. Chunk przetwarza duże wolumeny, dzieląc dane na zarządzalne partie.

CleanupTasklet.javajava
// Tasklet: pojedyncza akcja bez iteracji
@Component
public class CleanupTasklet implements Tasklet {

    private final Path tempDirectory = Path.of("/tmp/batch-work");

    @Override
    public RepeatStatus execute(StepContribution contribution,
                                 ChunkContext chunkContext) throws Exception {
        // Usuwa wszystkie pliki tymczasowe z przetwarzania
        try (var files = Files.walk(tempDirectory)) {
            files.filter(Files::isRegularFile)
                 .forEach(this::deleteQuietly);
        }

        // FINISHED oznacza, że tasklet ukończył pracę
        // CONTINUABLE wznowiłby wykonanie (przydatne dla pollingu)
        return RepeatStatus.FINISHED;
    }

    private void deleteQuietly(Path file) {
        try {
            Files.delete(file);
        } catch (IOException e) {
            // Loguj i kontynuuj - nie przerywaj zadania z powodu jednego pliku
        }
    }
}
ChunkProcessingConfig.javajava
// Przetwarzanie chunkowe: obsługa dużych wolumenów
@Configuration
public class ChunkProcessingConfig {

    @Bean
    public Step processOrdersStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<OrderRecord> reader,
                                   ItemProcessor<OrderRecord, ProcessedOrder> processor,
                                   ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("processOrdersStep", jobRepository)
                // Chunk 500: czyta 500 elementów, przetwarza, zapisuje, commit
                .<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                // Listener do monitorowania postępu
                .listener(new ChunkProgressListener())
                .build();
    }
}

Przetwarzanie chunkowe oferuje krytyczne korzyści: zoptymalizowane zarządzanie pamięcią (tylko bieżący chunk w pamięci), granularne transakcje (commit per chunk) i odzyskiwanie po awarii w ostatnim zatwierdzonym chunku.

Pogłębienie przetwarzania chunkowego

Pytanie 3: Jak działa cykl życia chunka?

Każdy chunk podąża dokładnym cyklem: czytanie elementów po jednym do osiągnięcia skonfigurowanego rozmiaru, indywidualne przetwarzanie każdego elementu, następnie zapis grupy. Transakcja obejmuje cały chunk.

OrderItemReader.javajava
// ItemReader: czyta jeden element na raz
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {

    // @StepScope: nowa instancja dla każdego wykonania stepu
    // Pozwala wstrzyknąć dynamiczne parametry zadania
    @Value("#{jobParameters['startDate']}")
    private LocalDate startDate;

    private Iterator<OrderRecord> orderIterator;

    @BeforeStep
    public void initializeReader(StepExecution stepExecution) {
        // Ładuje dane przy starcie stepu
        List<OrderRecord> orders = fetchOrdersFromDate(startDate);
        this.orderIterator = orders.iterator();
    }

    @Override
    public OrderRecord read() {
        // Zwraca null, aby zasygnalizować koniec danych
        // Spring Batch wywołuje read() do otrzymania null
        if (orderIterator.hasNext()) {
            return orderIterator.next();
        }
        return null;  // Koniec datasetu
    }

    private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
        // Pobiera ze źródła danych
        return List.of();  // Faktyczna implementacja
    }
}
OrderItemProcessor.javajava
// ItemProcessor: transformuje każdy element indywidualnie
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {

    private final PricingService pricingService;
    private final ValidationService validationService;

    public OrderItemProcessor(PricingService pricingService,
                               ValidationService validationService) {
        this.pricingService = pricingService;
        this.validationService = validationService;
    }

    @Override
    public ProcessedOrder process(OrderRecord item) {
        // Zwrócenie null filtruje element (nie zostanie zapisany)
        if (!validationService.isValid(item)) {
            return null;  // Element odfiltrowany
        }

        // Transformacja biznesowa
        BigDecimal finalPrice = pricingService.calculatePrice(item);

        return new ProcessedOrder(
                item.orderId(),
                item.customerId(),
                finalPrice,
                LocalDateTime.now()
        );
    }
}
OrderItemWriter.javajava
// ItemWriter: zapisuje cały chunk w jednej operacji
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;

    public OrderItemWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) {
        // Chunk zawiera wszystkie przetworzone elementy
        // Zapis batch dla zoptymalizowanej wydajności
        List<? extends ProcessedOrder> items = chunk.getItems();

        jdbcTemplate.batchUpdate(
                "INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
                items,
                items.size(),
                (ps, order) -> {
                    ps.setLong(1, order.orderId());
                    ps.setLong(2, order.customerId());
                    ps.setBigDecimal(3, order.finalPrice());
                    ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                }
        );
    }
}

Jeśli podczas przetwarzania chunka wystąpi wyjątek, transakcja jest wycofywana. Zadanie może następnie wznowić od tego chunka, używając metadanych przechowywanych w JobRepository.

Pytanie 4: Jak wybrać optymalny rozmiar chunka?

Rozmiar chunka bezpośrednio wpływa na wydajność i zużycie pamięci. Zbyt mały chunk mnoży commity (overhead). Zbyt duży chunk zużywa nadmiernie pamięć i wydłuża rollbacki przy awarii.

ChunkSizingConfig.javajava
// Dynamiczna konfiguracja rozmiaru chunka
@Configuration
public class ChunkSizingConfig {

    // Rozsądna wartość domyślna dla większości przypadków
    private static final int DEFAULT_CHUNK_SIZE = 100;

    // Dla lekkich elementów (mało pól)
    private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;

    // Dla ciężkich elementów (bloby, dokumenty)
    private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;

    @Bean
    public Step processLightDataStep(JobRepository jobRepository,
                                      PlatformTransactionManager txManager,
                                      ItemReader<LightRecord> reader,
                                      ItemWriter<LightRecord> writer) {
        return new StepBuilder("processLightDataStep", jobRepository)
                // Lekkie elementy: większe chunki dla mniejszej liczby commitów
                .<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
                .reader(reader)
                .writer(writer)
                .build();
    }

    @Bean
    public Step processDocumentsStep(JobRepository jobRepository,
                                      PlatformTransactionManager txManager,
                                      ItemReader<Document> reader,
                                      ItemProcessor<Document, ProcessedDocument> processor,
                                      ItemWriter<ProcessedDocument> writer) {
        return new StepBuilder("processDocumentsStep", jobRepository)
                // Ciężkie dokumenty: mniejsze chunki dla ograniczenia pamięci
                .<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}
Reguła praktyczna

Zacząć od 100 elementów na chunk, potem dostosować na podstawie metryk: czas commit, użycie pamięci i czas trwania rollbacku. Użyj listenerów, aby zidentyfikować optymalną wartość.

Partycjonowanie dla przetwarzania równoległego

Pytanie 5: Czym jest partycjonowanie i kiedy je stosować?

Partycjonowanie dzieli dataset na niezależne partycje przetwarzane równolegle. Każda partycja działa w swoim wątku (lokalnie) lub na zdalnym workerze. To podejście mnoży przepustowość bez utraty możliwości restartu.

PartitionedJobConfig.javajava
// Konfiguracja zadania partycjonowanego
@Configuration
public class PartitionedJobConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public PartitionedJobConfig(JobRepository jobRepository,
                                 PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    @Bean
    public Job partitionedImportJob(Step partitionedStep) {
        return new JobBuilder("partitionedImportJob", jobRepository)
                .start(partitionedStep)
                .build();
    }

    // Step manager: orkiestruje partycje
    @Bean
    public Step partitionedStep(Partitioner partitioner,
                                 Step workerStep,
                                 TaskExecutor taskExecutor) {
        return new StepBuilder("partitionedStep", jobRepository)
                // Dzieli pracę poprzez Partitioner
                .partitioner("workerStep", partitioner)
                // Step wykonywany dla każdej partycji
                .step(workerStep)
                // 8 równoległych wątków
                .taskExecutor(taskExecutor)
                // Liczba partycji do utworzenia
                .gridSize(8)
                .build();
    }

    // TaskExecutor dla wykonania równoległego
    @Bean
    public TaskExecutor batchTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(16);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("batch-partition-");
        executor.initialize();
        return executor;
    }
}
RangePartitioner.javajava
// Partitioner oparty na zakresach ID
@Component
public class RangePartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public RangePartitioner(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // Pobiera granice datasetu
        Long minId = jdbcTemplate.queryForObject(
                "SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
        Long maxId = jdbcTemplate.queryForObject(
                "SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);

        if (minId == null || maxId == null) {
            return Map.of();  // Brak danych do przetworzenia
        }

        // Oblicza rozmiar każdej partycji
        long range = (maxId - minId) / gridSize + 1;
        Map<String, ExecutionContext> partitions = new HashMap<>();

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            long start = minId + (i * range);
            long end = Math.min(start + range - 1, maxId);

            // Każda partycja otrzymuje swoje granice
            context.putLong("minId", start);
            context.putLong("maxId", end);
            context.putInt("partitionNumber", i);

            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

Partycjonowanie jest odpowiednie dla dużych datasetów, gdzie elementy są niezależne. Partycje muszą być zbalansowane, aby zapobiec spowolnieniu całego zadania przez wolną partycję.

Pytanie 6: Jaka jest różnica między partycjonowaniem lokalnym a zdalnym?

Partycjonowanie lokalne wykonuje wszystkie partycje w tej samej JVM z thread poolem. Partycjonowanie zdalne rozdziela partycje między wiele JVM (workery) poprzez middleware komunikatów.

RemotePartitioningConfig.javajava
// Konfiguracja partycjonowania zdalnego z komunikatami
@Configuration
public class RemotePartitioningConfig {

    @Bean
    public Step managerStep(JobRepository jobRepository,
                             Partitioner partitioner,
                             MessageChannelPartitionHandler partitionHandler) {
        return new StepBuilder("managerStep", jobRepository)
                .partitioner("workerStep", partitioner)
                // Handler komunikujący się ze zdalnymi workerami
                .partitionHandler(partitionHandler)
                .build();
    }

    // PartitionHandler wysyła ExecutionContexty do workerów
    @Bean
    public MessageChannelPartitionHandler partitionHandler(
            MessagingTemplate messagingTemplate,
            JobExplorer jobExplorer) {
        MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
        handler.setStepName("workerStep");
        handler.setGridSize(4);
        handler.setMessagingOperations(messagingTemplate);
        handler.setJobExplorer(jobExplorer);
        // Timeout oczekiwania na zakończenie workerów
        handler.setPollInterval(5000L);
        return handler;
    }
}
WorkerConfiguration.javajava
// Konfiguracja po stronie workera
@Configuration
public class WorkerConfiguration {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public WorkerConfiguration(JobRepository jobRepository,
                                PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    // Worker odbiera partycje i wykonuje step
    @Bean
    public Step workerStep(ItemReader<OrderRecord> reader,
                            ItemProcessor<OrderRecord, ProcessedOrder> processor,
                            ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("workerStep", jobRepository)
                .<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
                // Reader z @StepScope dla parametrów partycji
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    // Reader używający granic partycji
    @Bean
    @StepScope
    public JdbcCursorItemReader<OrderRecord> partitionedReader(
            DataSource dataSource,
            @Value("#{stepExecutionContext['minId']}") Long minId,
            @Value("#{stepExecutionContext['maxId']}") Long maxId) {
        return new JdbcCursorItemReaderBuilder<OrderRecord>()
                .name("partitionedOrderReader")
                .dataSource(dataSource)
                .sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
                .preparedStatementSetter(ps -> {
                    ps.setLong(1, minId);
                    ps.setLong(2, maxId);
                })
                .rowMapper(new OrderRecordRowMapper())
                .build();
    }
}

Gotowy na rozmowy o Spring Boot?

Ćwicz z naszymi interaktywnymi symulatorami, flashcards i testami technicznymi.

Tolerancja błędów i odzyskiwanie

Pytanie 7: Jakie mechanizmy tolerancji błędów oferuje Spring Batch?

Spring Batch oferuje trzy uzupełniające się mechanizmy: skip (ignorowanie elementów z błędami), retry (automatyczne ponawianie) i restart (wznowienie nieudanego zadania). Mechanizmy te konfiguruje się na poziomie stepu.

FaultTolerantStepConfig.javajava
// Pełna konfiguracja tolerancji błędów
@Configuration
public class FaultTolerantStepConfig {

    @Bean
    public Step faultTolerantStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<DataRecord> reader,
                                   ItemProcessor<DataRecord, ProcessedRecord> processor,
                                   ItemWriter<ProcessedRecord> writer,
                                   SkipPolicy customSkipPolicy) {
        return new StepBuilder("faultTolerantStep", jobRepository)
                .<DataRecord, ProcessedRecord>chunk(100, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                // Aktywuje tryb fault tolerant
                .faultTolerant()
                // SKIP: ignoruje do 10 błędów walidacji
                .skipLimit(10)
                .skip(ValidationException.class)
                .skip(DataIntegrityViolationException.class)
                // Niektóre błędy nigdy nie powinny być pomijane
                .noSkip(FatalBatchException.class)
                // RETRY: ponawia błędy przejściowe
                .retryLimit(3)
                .retry(TransientDataAccessException.class)
                .retry(DeadlockLoserDataAccessException.class)
                // Wykładniczy backoff między ponowieniami
                .backOffPolicy(exponentialBackOffPolicy())
                // Listener do logowania pominięć
                .listener(skipListener())
                .build();
    }

    @Bean
    public BackOffPolicy exponentialBackOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(1000);  // 1 sekunda
        policy.setMultiplier(2.0);         // Podwaja się przy każdym ponowieniu
        policy.setMaxInterval(10000);      // Maksymalnie 10 sekund
        return policy;
    }

    @Bean
    public SkipListener<DataRecord, ProcessedRecord> skipListener() {
        return new SkipListener<>() {
            @Override
            public void onSkipInRead(Throwable t) {
                // Loguj nieczytelny element
            }

            @Override
            public void onSkipInProcess(DataRecord item, Throwable t) {
                // Loguj element, który nie powiódł się przy przetwarzaniu
            }

            @Override
            public void onSkipInWrite(ProcessedRecord item, Throwable t) {
                // Loguj element, który nie powiódł się przy zapisie
            }
        };
    }
}

Retry pasuje do błędów przejściowych (timeout sieci, deadlock bazy). Skip pasuje do błędów indywidualnych danych, które nie powinny blokować ogólnego przetwarzania.

Pytanie 8: Jak zaimplementować własny SkipPolicy?

Własny SkipPolicy umożliwia szczegółową logikę decyzyjną: pomijanie według typu wyjątku, liczby błędów lub specyficznych kryteriów biznesowych.

AdaptiveSkipPolicy.javajava
// SkipPolicy z zaawansowaną logiką biznesową
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {

    private static final int MAX_SKIP_COUNT = 100;
    private static final double MAX_SKIP_PERCENTAGE = 0.05;  // 5% maksimum

    private final AtomicInteger totalProcessed = new AtomicInteger(0);
    private final AtomicInteger skipCount = new AtomicInteger(0);

    @Override
    public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
        // Nigdy nie pomijać błędów krytycznych
        if (exception instanceof FatalBatchException
                || exception instanceof OutOfMemoryError) {
            return false;
        }

        // Bezwzględny limit pominięć
        if (skipCountSoFar >= MAX_SKIP_COUNT) {
            return false;  // Zatrzymuje zadanie
        }

        // Limit procentowy
        int total = totalProcessed.get();
        if (total > 1000) {  // Stosować dopiero po rozgrzewce
            double skipPercentage = (double) skipCountSoFar / total;
            if (skipPercentage > MAX_SKIP_PERCENTAGE) {
                return false;  // Zbyt wiele błędów proporcjonalnie
            }
        }

        // Pomijaj błędy walidacji i danych
        return exception instanceof ValidationException
                || exception instanceof DataFormatException
                || exception instanceof IllegalArgumentException;
    }

    // Wywoływane przez listener do śledzenia postępu
    public void incrementProcessed() {
        totalProcessed.incrementAndGet();
    }
}

Pytanie 9: Jak działa restart nieudanego zadania?

JobRepository przechowuje stan każdego wykonania. Przy restarcie Spring Batch identyfikuje ostatni zatwierdzony chunk i wznawia od tego punktu. Pomyślnie przetworzone elementy nie są ponownie przetwarzane.

JobRestartService.javajava
// Serwis zarządzania restartami zadań
@Service
public class JobRestartService {

    private final JobLauncher jobLauncher;
    private final JobExplorer jobExplorer;
    private final JobRepository jobRepository;
    private final Job importJob;

    public JobRestartService(JobLauncher jobLauncher,
                              JobExplorer jobExplorer,
                              JobRepository jobRepository,
                              @Qualifier("importJob") Job importJob) {
        this.jobLauncher = jobLauncher;
        this.jobExplorer = jobExplorer;
        this.jobRepository = jobRepository;
        this.importJob = importJob;
    }

    public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
        // Pobiera nieudane wykonanie
        JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);

        if (failedExecution == null) {
            throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
        }

        // Sprawdza, czy zadanie może być zrestartowane
        if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
            throw new IllegalStateException("Only FAILED jobs can be restarted");
        }

        // Używa tych samych parametrów co oryginalne wykonanie
        JobParameters originalParams = failedExecution.getJobParameters();

        // Uruchamia ponownie zadanie - automatycznie wznawia od ostatniego checkpointu
        return jobLauncher.run(importJob, originalParams);
    }

    public List<JobExecution> findRestartableJobs() {
        // Listuje wszystkie wykonania FAILED jeszcze nie zrestartowane
        return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
                .stream()
                .flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
                .filter(exec -> exec.getStatus() == BatchStatus.FAILED)
                .filter(this::isRestartable)
                .toList();
    }

    private boolean isRestartable(JobExecution execution) {
        // Sprawdza, że nie istnieje nowsze udane wykonanie
        JobInstance instance = execution.getJobInstance();
        return jobExplorer.getJobExecutions(instance).stream()
                .noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
    }
}
Pułapka rozmowy

Zadanie można zrestartować tylko, jeśli JobParameters są identyczne. Modyfikacja parametru tworzy nową instancję zadania, tracąc historię postępu.

Skalowanie i optymalizacja

Pytanie 10: Jakie strategie skalowania są dostępne?

Spring Batch oferuje cztery strategie: multi-threaded step (wiele wątków czyta równolegle), parallel steps (niezależne stepy równolegle), remote chunking (przetwarzanie rozproszone) i partitioning (rozproszone dane).

MultiThreadedStepConfig.javajava
// Step multi-threaded: wiele wątków przetwarza ten sam dataset
@Configuration
public class MultiThreadedStepConfig {

    @Bean
    public Step multiThreadedStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<Record> reader,
                                   ItemProcessor<Record, ProcessedRecord> processor,
                                   ItemWriter<ProcessedRecord> writer,
                                   TaskExecutor taskExecutor) {
        return new StepBuilder("multiThreadedStep", jobRepository)
                .<Record, ProcessedRecord>chunk(100, transactionManager)
                // UWAGA: reader musi być thread-safe
                .reader(synchronizedReader(reader))
                .processor(processor)
                .writer(writer)
                // 4 wątki przetwarzają chunki równolegle
                .taskExecutor(taskExecutor)
                .throttleLimit(4)
                .build();
    }

    // Wrapper, aby uczynić reader thread-safe
    private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
        SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
        syncReader.setDelegate((ItemStreamReader<Record>) reader);
        return syncReader;
    }
}
ParallelStepsConfig.javajava
// Wykonywanie niezależnych stepów równolegle
@Configuration
public class ParallelStepsConfig {

    @Bean
    public Job parallelJob(JobRepository jobRepository,
                            Step loadCustomersStep,
                            Step loadProductsStep,
                            Step loadOrdersStep,
                            Step processDataStep) {
        // Równoległy flow: customers i products ładowane jednocześnie
        Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
                .start(loadCustomersStep)
                .build();

        Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
                .start(loadProductsStep)
                .build();

        Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
                .start(loadOrdersStep)
                .build();

        // Split wykonuje flowy równolegle
        return new JobBuilder("parallelJob", jobRepository)
                .start(new FlowBuilder<Flow>("parallelLoadFlow")
                        .split(new SimpleAsyncTaskExecutor())
                        .add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
                        .build())
                // Po równoległym ładowaniu, sekwencyjne przetwarzanie
                .next(processDataStep)
                .build()
                .build();
    }
}

Multi-threading pasuje do przypadków, gdy reader można zsynchronizować. Partycjonowanie jest preferowane dla dużych wolumenów, ponieważ każda partycja ma własny reader bez rywalizacji.

Pytanie 11: Jak monitorować wydajność zadania?

Spring Batch udostępnia metryki przez listenery i JobRepository. Integracja z Micrometer umożliwia eksport do Prometheusa, Grafany lub innych systemów monitorowania.

BatchMetricsConfig.javajava
// Konfiguracja monitorowania z Micrometer
@Configuration
public class BatchMetricsConfig {

    private final MeterRegistry meterRegistry;

    public BatchMetricsConfig(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }

    @Bean
    public JobExecutionListener metricsJobListener() {
        return new JobExecutionListener() {

            private Timer.Sample jobTimer;

            @Override
            public void beforeJob(JobExecution jobExecution) {
                // Uruchamia timer trwania zadania
                jobTimer = Timer.start(meterRegistry);
                Counter.builder("batch.job.started")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .register(meterRegistry)
                        .increment();
            }

            @Override
            public void afterJob(JobExecution jobExecution) {
                // Rejestruje całkowity czas trwania
                jobTimer.stop(Timer.builder("batch.job.duration")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .tag("status", jobExecution.getStatus().toString())
                        .register(meterRegistry));

                // Licznik zadań według statusu
                Counter.builder("batch.job.completed")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .tag("status", jobExecution.getStatus().toString())
                        .register(meterRegistry)
                        .increment();
            }
        };
    }

    @Bean
    public StepExecutionListener metricsStepListener() {
        return new StepExecutionListener() {

            @Override
            public void afterStep(StepExecution stepExecution) {
                String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
                String stepName = stepExecution.getStepName();

                // Metryki przepustowości
                Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                return null;
            }
        };
    }
}

Pytanie 12: Jakie są typowe pułapki partycjonowania?

Częste błędy obejmują: niezbalansowane partycje (jedna partycja zawiera 90% danych), readery bez bezpieczeństwa wątków i nieprawidłowe zarządzanie stanem między partycjami.

BalancedPartitioner.javajava
// Partitioner faktycznie równoważący obciążenie
@Component
public class BalancedPartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // Zlicza całkowitą liczbę elementów do przetworzenia
        Integer totalCount = jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);

        if (totalCount == null || totalCount == 0) {
            return Map.of();
        }

        // Oblicza docelowy rozmiar partycji
        int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);

        Map<String, ExecutionContext> partitions = new HashMap<>();

        // Używa OFFSET/LIMIT dla zbalansowanych partycji
        // Droższe niż zakresy, ale gwarantuje balans
        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.putInt("offset", i * itemsPerPartition);
            context.putInt("limit", itemsPerPartition);
            context.putInt("partitionNumber", i);

            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

// OffsetBasedReader.java
// Reader kompatybilny z partycjonowaniem opartym na offset
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {

    private final JdbcTemplate jdbcTemplate;
    private Iterator<OrderRecord> iterator;

    @Value("#{stepExecutionContext['offset']}")
    private int offset;

    @Value("#{stepExecutionContext['limit']}")
    private int limit;

    public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void open(ExecutionContext executionContext) {
        // Ładuje dokładnie część przydzieloną tej partycji
        List<OrderRecord> records = jdbcTemplate.query(
                "SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
                new OrderRecordRowMapper(),
                limit, offset
        );
        this.iterator = records.iterator();
    }

    @Override
    public OrderRecord read() {
        return iterator.hasNext() ? iterator.next() : null;
    }

    @Override
    public void update(ExecutionContext executionContext) {
        // Zapisanie stanu dla restartu, jeśli potrzebne
    }

    @Override
    public void close() {
        // Sprzątanie
    }
}

Pytania zaawansowane dla seniorów

Pytanie 13: Jak zarządzać zależnościami między zadaniami?

Spring Batch nie zarządza natywnie zależnościami między zadaniami. Rozwiązania obejmują: zewnętrzne orkiestratory (Airflow, Kubernetes CronJob) lub niestandardową implementację z JobExplorer.

JobDependencyService.javajava
// Zarządzanie zależnościami między zadaniami
@Service
public class JobDependencyService {

    private final JobExplorer jobExplorer;
    private final JobLauncher jobLauncher;
    private final Map<String, Job> jobs;

    public JobDependencyService(JobExplorer jobExplorer,
                                  JobLauncher jobLauncher,
                                  Map<String, Job> jobs) {
        this.jobExplorer = jobExplorer;
        this.jobLauncher = jobLauncher;
        this.jobs = jobs;
    }

    public JobExecution runWithDependencies(String jobName,
                                             JobParameters params,
                                             List<String> dependsOn) throws Exception {
        // Sprawdza, czy wszystkie zależności zakończyły się sukcesem
        for (String dependency : dependsOn) {
            if (!hasSuccessfulExecution(dependency, params)) {
                throw new JobExecutionException(
                        "Dependency not satisfied: " + dependency);
            }
        }

        Job job = jobs.get(jobName);
        if (job == null) {
            throw new IllegalArgumentException("Unknown job: " + jobName);
        }

        return jobLauncher.run(job, params);
    }

    private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
        // Szuka wykonania COMPLETED z tymi samymi parametrami biznesowymi
        return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
                .stream()
                .flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
                .filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
                .anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
    }

    private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
        // Porównuje parametry biznesowe (ignoruje timestampy wykonania)
        String actualDate = actual.getString("businessDate");
        String expectedDate = expected.getString("businessDate");
        return Objects.equals(actualDate, expectedDate);
    }
}

Pytanie 14: Jak skutecznie testować zadanie Spring Batch?

Testowanie zadań Spring Batch wymaga warstwowego podejścia: testy jednostkowe komponentów (reader, processor, writer), testy integracyjne stepów i testy end-to-end pełnych zadań.

OrderProcessorTest.javajava
// Test jednostkowy procesora
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {

    @Mock
    private PricingService pricingService;

    @Mock
    private ValidationService validationService;

    @InjectMocks
    private OrderItemProcessor processor;

    @Test
    void shouldProcessValidOrder() {
        // Given
        OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
        when(validationService.isValid(input)).thenReturn(true);
        when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));

        // When
        ProcessedOrder result = processor.process(input);

        // Then
        assertThat(result).isNotNull();
        assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
    }

    @Test
    void shouldFilterInvalidOrder() {
        // Given
        OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
        when(validationService.isValid(input)).thenReturn(false);

        // When
        ProcessedOrder result = processor.process(input);

        // Then - null oznacza odfiltrowane
        assertThat(result).isNull();
        verify(pricingService, never()).calculatePrice(any());
    }
}
ImportJobIntegrationTest.javajava
// Test integracyjny pełnego zadania
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JobRepositoryTestUtils jobRepositoryTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @BeforeEach
    void setup() {
        // Czyści metadane między testami
        jobRepositoryTestUtils.removeJobExecutions();
        // Resetuje dane testowe
        jdbcTemplate.execute("DELETE FROM processed_orders");
        jdbcTemplate.execute("DELETE FROM orders");
    }

    @Test
    void shouldCompleteJobSuccessfully() throws Exception {
        // Given - dane testowe
        insertTestOrders(100);

        // When
        JobParameters params = new JobParametersBuilder()
                .addLocalDate("businessDate", LocalDate.now())
                .addLong("run.id", System.currentTimeMillis())
                .toJobParameters();

        JobExecution execution = jobLauncherTestUtils.launchJob(params);

        // Then
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(countProcessedOrders()).isEqualTo(100);
    }

    @Test
    void shouldHandleEmptyDataset() throws Exception {
        // Given - brak danych

        // When
        JobExecution execution = jobLauncherTestUtils.launchJob();

        // Then - zadanie odnosi sukces nawet bez danych
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }

    @Test
    void shouldRestartFromFailurePoint() throws Exception {
        // Given - symuluje błąd w środku przetwarzania
        insertTestOrders(100);
        insertPoisonOrder(50);  // Powoduje błąd

        // When - pierwsze wykonanie kończy się niepowodzeniem
        JobExecution firstExecution = jobLauncherTestUtils.launchJob();
        assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);

        // Naprawia dane
        removePoisonOrder(50);

        // When - restart
        JobExecution restartExecution = jobLauncherTestUtils.launchJob(
                firstExecution.getJobParameters());

        // Then - wznawia od punktu błędu
        assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }

    private void insertTestOrders(int count) {
        for (int i = 1; i <= count; i++) {
            jdbcTemplate.update(
                    "INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
                    i, i * 10, BigDecimal.valueOf(i * 10));
        }
    }

    private int countProcessedOrders() {
        return jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM processed_orders", Integer.class);
    }
}

Pytanie 15: Jak zoptymalizować wydajność zapisu do bazy danych?

Zapis często staje się wąskim gardłem. Optymalizacje obejmują: batch inserty JDBC, wyłączenie ograniczeń podczas ładowania i użycie tabel staging.

OptimizedJdbcWriter.javajava
// Writer zoptymalizowany dla dużych wolumenów
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;
    private final DataSource dataSource;

    public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
        this.jdbcTemplate = jdbcTemplate;
        this.dataSource = dataSource;
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
        List<? extends ProcessedOrder> items = chunk.getItems();

        if (items.isEmpty()) {
            return;
        }

        // Używa PreparedStatement z batch
        try (Connection connection = dataSource.getConnection();
             PreparedStatement ps = connection.prepareStatement(
                     "INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
                             "VALUES (?, ?, ?, ?)")) {

            for (ProcessedOrder order : items) {
                ps.setLong(1, order.orderId());
                ps.setLong(2, order.customerId());
                ps.setBigDecimal(3, order.finalPrice());
                ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                ps.addBatch();
            }

            // Wykonuje wszystkie inserty w jednej operacji sieciowej
            ps.executeBatch();
        }
    }
}

// StagingTableWriter.java
// Wzorzec staging table dla bardzo dużych wolumenów
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {

    private final JdbcTemplate jdbcTemplate;
    private String stagingTable;

    public StagingTableWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        // Tworzy tymczasową tabelę dla tego stepu
        stagingTable = "staging_orders_" + stepExecution.getId();
        jdbcTemplate.execute(
                "CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) {
        // Zapisuje do tabeli staging (bez ograniczeń FK)
        String sql = "INSERT INTO " + stagingTable +
                " (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";

        jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
                (ps, order) -> {
                    ps.setLong(1, order.orderId());
                    ps.setLong(2, order.customerId());
                    ps.setBigDecimal(3, order.finalPrice());
                    ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                });
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
            // Bulk copy do tabeli docelowej
            jdbcTemplate.execute(
                    "INSERT INTO processed_orders SELECT * FROM " + stagingTable);
        }
        // Czyści tabelę staging
        jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
        return stepExecution.getExitStatus();
    }
}

Podsumowanie

Opanowanie Spring Batch 5 na rozmowach technicznych opiera się na dogłębnym zrozumieniu mechanizmów wewnętrznych:

Architektura: Job → Step → Chunk (Reader, Processor, Writer)

Przetwarzanie chunkowe: dobór rozmiaru, cykl życia, transakcje

Partycjonowanie: lokalne vs zdalne, równoważenie partycji

Tolerancja błędów: skip, retry, restart z odpowiednią polityką

Skalowanie: multi-threading, parallel steps, remote chunking

Testy: jednostkowe, integracyjne, end-to-end

Optymalizacja: batch writes, tabele staging, monitorowanie

Zaawansowane pytania oceniają umiejętność uzasadniania wyborów architektonicznych w zależności od kontekstu: wolumen danych, ograniczenia czasowe, tolerancja błędów i dostępna infrastruktura.

Zacznij ćwiczyć!

Sprawdź swoją wiedzę z naszymi symulatorami rozmów i testami technicznymi.

Tagi

#spring batch
#spring boot
#java
#batch processing
#interview questions

Udostępnij

Powiązane artykuły