Spring Batch 5 Interview: Partitioning, Chunks und Fehlertoleranz

Meistern Sie Spring Batch 5 Interviews: 15 essenzielle Fragen zu Partitioning, Chunk-Verarbeitung und Fehlertoleranz mit Java 21 Codebeispielen.

Spring Batch 5 Interview: Partitioning, Chunks und Fehlertoleranz

Spring Batch 5 ist ein zentraler Baustein für die Verarbeitung großer Datenmengen im Spring-Ökosystem. Technische Interviews bewerten die Fähigkeit, robuste, skalierbare und fehlertolerante Jobs zu entwerfen. Die Beherrschung von Partitioning, Chunk-Oriented Processing und Fehlertoleranzmechanismen unterscheidet erfahrene Entwickler.

Schwerpunkt im Interview

Recruiter prüfen tiefes Verständnis: Warum Partitioning statt Remote Chunking? Wie dimensioniert man Chunks korrekt? Solche Architekturentscheidungen offenbaren echte Produktionserfahrung.

Grundarchitektur von Spring Batch 5

Frage 1: Was sind die Hauptkomponenten von Spring Batch?

Die Spring Batch Architektur basiert auf drei Schichten: Application (Jobs und Geschäftslogik), Batch Core (Runtime-Klassen zum Starten und Steuern von Jobs) und Infrastructure (gemeinsame Reader, Writer und Services wie RetryTemplate).

BatchJobConfig.javajava
// Spring Batch 5 Job-Konfiguration mit Java 21
@Configuration
public class BatchJobConfig {

    // JobRepository speichert die Ausführungs-Metadaten
    // Ermöglicht Restart und Job-Monitoring
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public BatchJobConfig(JobRepository jobRepository,
                          PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    // Ein Job kapselt den vollständigen Batch-Prozess
    // Besteht aus einem oder mehreren sequentiell ausgeführten Steps
    @Bean
    public Job importUserJob(Step processUsersStep, Step cleanupStep) {
        return new JobBuilder("importUserJob", jobRepository)
                .start(processUsersStep)      // Haupt-Verarbeitungs-Step
                .next(cleanupStep)             // Aufräum-Step
                .build();
    }

    // Ein Step repräsentiert eine unabhängige Arbeitseinheit
    // Zwei Modelle: Tasklet (einzelne Aufgabe) oder Chunk (iterative Verarbeitung)
    @Bean
    public Step processUsersStep(ItemReader<UserRecord> reader,
                                  ItemProcessor<UserRecord, User> processor,
                                  ItemWriter<User> writer) {
        return new StepBuilder("processUsersStep", jobRepository)
                .<UserRecord, User>chunk(100, transactionManager)  // Commit alle 100 Items
                .reader(reader)       // Liest Quelldaten
                .processor(processor) // Transformiert jedes Item
                .writer(writer)       // Schreibt in 100er-Batches
                .build();
    }
}

Das JobRepository persistiert den Ausführungsstatus in der Datenbank. Diese Persistenz ermöglicht es, einen fehlgeschlagenen Job genau dort fortzusetzen, wo er gestoppt wurde, ohne bereits committete Daten erneut zu verarbeiten.

Frage 2: Was ist der Unterschied zwischen Tasklet und Chunk-orientierter Verarbeitung?

Tasklet führt eine diskrete, nicht-iterative Aktion aus: Datei löschen, Stored Procedure aufrufen, Benachrichtigungs-E-Mail senden. Chunk verarbeitet große Volumina, indem die Daten in handhabbare Batches aufgeteilt werden.

CleanupTasklet.javajava
// Tasklet: einzelne Aktion ohne Iteration
@Component
public class CleanupTasklet implements Tasklet {

    private final Path tempDirectory = Path.of("/tmp/batch-work");

    @Override
    public RepeatStatus execute(StepContribution contribution,
                                 ChunkContext chunkContext) throws Exception {
        // Löscht alle temporären Dateien der Verarbeitung
        try (var files = Files.walk(tempDirectory)) {
            files.filter(Files::isRegularFile)
                 .forEach(this::deleteQuietly);
        }

        // FINISHED zeigt an, dass das Tasklet seine Arbeit abgeschlossen hat
        // CONTINUABLE würde die Ausführung neu starten (nützlich für Polling)
        return RepeatStatus.FINISHED;
    }

    private void deleteQuietly(Path file) {
        try {
            Files.delete(file);
        } catch (IOException e) {
            // Loggen und fortfahren - Job nicht wegen einer Datei abbrechen
        }
    }
}
ChunkProcessingConfig.javajava
// Chunk-Verarbeitung: Hochvolumige Verarbeitung
@Configuration
public class ChunkProcessingConfig {

    @Bean
    public Step processOrdersStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<OrderRecord> reader,
                                   ItemProcessor<OrderRecord, ProcessedOrder> processor,
                                   ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("processOrdersStep", jobRepository)
                // Chunk von 500: liest 500 Items, verarbeitet, schreibt, committet
                .<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                // Listener zur Fortschrittsüberwachung
                .listener(new ChunkProgressListener())
                .build();
    }
}

Chunk-orientierte Verarbeitung bietet entscheidende Vorteile: optimiertes Speichermanagement (nur der aktuelle Chunk im Speicher), granulare Transaktionen (Commit pro Chunk) und Fehlerwiederherstellung beim letzten committeten Chunk.

Vertiefung der Chunk-orientierten Verarbeitung

Frage 3: Wie funktioniert der Chunk-Lebenszyklus?

Jeder Chunk folgt einem präzisen Zyklus: Lesen der Items einzeln bis zur konfigurierten Größe, individuelle Verarbeitung jedes Items, dann Schreiben der Gruppe. Eine Transaktion umschließt den gesamten Chunk.

OrderItemReader.javajava
// ItemReader: liest ein Item nach dem anderen
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {

    // @StepScope: neue Instanz pro Step-Ausführung
    // Ermöglicht das Injizieren dynamischer Job-Parameter
    @Value("#{jobParameters['startDate']}")
    private LocalDate startDate;

    private Iterator<OrderRecord> orderIterator;

    @BeforeStep
    public void initializeReader(StepExecution stepExecution) {
        // Lädt Daten beim Start des Steps
        List<OrderRecord> orders = fetchOrdersFromDate(startDate);
        this.orderIterator = orders.iterator();
    }

    @Override
    public OrderRecord read() {
        // Gibt null zurück, um das Datenende zu signalisieren
        // Spring Batch ruft read() auf, bis null zurückkommt
        if (orderIterator.hasNext()) {
            return orderIterator.next();
        }
        return null;  // Ende des Datasets
    }

    private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
        // Holt Daten aus der Datenquelle
        return List.of();  // Tatsächliche Implementierung
    }
}
OrderItemProcessor.javajava
// ItemProcessor: transformiert jedes Item einzeln
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {

    private final PricingService pricingService;
    private final ValidationService validationService;

    public OrderItemProcessor(PricingService pricingService,
                               ValidationService validationService) {
        this.pricingService = pricingService;
        this.validationService = validationService;
    }

    @Override
    public ProcessedOrder process(OrderRecord item) {
        // null zurückgeben filtert das Item (wird nicht geschrieben)
        if (!validationService.isValid(item)) {
            return null;  // Item gefiltert
        }

        // Geschäftliche Transformation
        BigDecimal finalPrice = pricingService.calculatePrice(item);

        return new ProcessedOrder(
                item.orderId(),
                item.customerId(),
                finalPrice,
                LocalDateTime.now()
        );
    }
}
OrderItemWriter.javajava
// ItemWriter: schreibt den vollständigen Chunk in einer Operation
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;

    public OrderItemWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) {
        // Der Chunk enthält alle verarbeiteten Items
        // Batch-Schreiben für optimierte Performance
        List<? extends ProcessedOrder> items = chunk.getItems();

        jdbcTemplate.batchUpdate(
                "INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
                items,
                items.size(),
                (ps, order) -> {
                    ps.setLong(1, order.orderId());
                    ps.setLong(2, order.customerId());
                    ps.setBigDecimal(3, order.finalPrice());
                    ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                }
        );
    }
}

Tritt während der Chunk-Verarbeitung eine Exception auf, wird die Transaktion zurückgerollt. Der Job kann dann von diesem Chunk aus fortgesetzt werden, indem die im JobRepository gespeicherten Metadaten verwendet werden.

Frage 4: Wie wählt man die optimale Chunk-Größe?

Die Chunk-Größe beeinflusst Performance und Speicherverbrauch direkt. Ein zu kleiner Chunk multipliziert die Commits (Overhead). Ein zu großer Chunk verbraucht zu viel Speicher und verlängert Rollbacks bei Fehlern.

ChunkSizingConfig.javajava
// Dynamische Konfiguration der Chunk-Größe
@Configuration
public class ChunkSizingConfig {

    // Sinnvoller Standardwert für die meisten Fälle
    private static final int DEFAULT_CHUNK_SIZE = 100;

    // Für leichte Items (wenige Felder)
    private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;

    // Für schwere Items (Blobs, Dokumente)
    private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;

    @Bean
    public Step processLightDataStep(JobRepository jobRepository,
                                      PlatformTransactionManager txManager,
                                      ItemReader<LightRecord> reader,
                                      ItemWriter<LightRecord> writer) {
        return new StepBuilder("processLightDataStep", jobRepository)
                // Leichte Items: größere Chunks für weniger Commits
                .<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
                .reader(reader)
                .writer(writer)
                .build();
    }

    @Bean
    public Step processDocumentsStep(JobRepository jobRepository,
                                      PlatformTransactionManager txManager,
                                      ItemReader<Document> reader,
                                      ItemProcessor<Document, ProcessedDocument> processor,
                                      ItemWriter<ProcessedDocument> writer) {
        return new StepBuilder("processDocumentsStep", jobRepository)
                // Schwere Dokumente: kleinere Chunks zur Speicherbegrenzung
                .<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}
Faustregel

Mit 100 Items pro Chunk beginnen, dann anhand von Metriken anpassen: Commit-Zeit, Speichernutzung und Rollback-Dauer. Listener nutzen, um den optimalen Wert zu identifizieren.

Partitioning für parallele Verarbeitung

Frage 5: Was ist Partitioning und wann sollte man es einsetzen?

Partitioning teilt einen Datensatz in unabhängige, parallel verarbeitete Partitionen auf. Jede Partition läuft in einem eigenen Thread (lokal) oder auf einem Remote-Worker. Dieser Ansatz vervielfacht den Durchsatz, ohne die Restart-Fähigkeit zu opfern.

PartitionedJobConfig.javajava
// Konfiguration eines partitionierten Jobs
@Configuration
public class PartitionedJobConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public PartitionedJobConfig(JobRepository jobRepository,
                                 PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    @Bean
    public Job partitionedImportJob(Step partitionedStep) {
        return new JobBuilder("partitionedImportJob", jobRepository)
                .start(partitionedStep)
                .build();
    }

    // Manager-Step: orchestriert die Partitionen
    @Bean
    public Step partitionedStep(Partitioner partitioner,
                                 Step workerStep,
                                 TaskExecutor taskExecutor) {
        return new StepBuilder("partitionedStep", jobRepository)
                // Teilt die Arbeit über den Partitioner
                .partitioner("workerStep", partitioner)
                // Step, der für jede Partition ausgeführt wird
                .step(workerStep)
                // 8 parallele Threads
                .taskExecutor(taskExecutor)
                // Anzahl zu erstellender Partitionen
                .gridSize(8)
                .build();
    }

    // TaskExecutor für parallele Ausführung
    @Bean
    public TaskExecutor batchTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(16);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("batch-partition-");
        executor.initialize();
        return executor;
    }
}
RangePartitioner.javajava
// Partitioner basierend auf ID-Bereichen
@Component
public class RangePartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public RangePartitioner(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // Ermittelt die Grenzen des Datasets
        Long minId = jdbcTemplate.queryForObject(
                "SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
        Long maxId = jdbcTemplate.queryForObject(
                "SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);

        if (minId == null || maxId == null) {
            return Map.of();  // Keine Daten zu verarbeiten
        }

        // Berechnet die Größe jeder Partition
        long range = (maxId - minId) / gridSize + 1;
        Map<String, ExecutionContext> partitions = new HashMap<>();

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            long start = minId + (i * range);
            long end = Math.min(start + range - 1, maxId);

            // Jede Partition erhält ihre Grenzen
            context.putLong("minId", start);
            context.putLong("maxId", end);
            context.putInt("partitionNumber", i);

            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

Partitioning eignet sich für große Datasets, bei denen Items unabhängig sind. Partitionen müssen ausbalanciert sein, um zu verhindern, dass eine langsame Partition den gesamten Job ausbremst.

Frage 6: Worin unterscheiden sich lokales und Remote-Partitioning?

Lokales Partitioning führt alle Partitionen in derselben JVM mit einem Thread-Pool aus. Remote-Partitioning verteilt die Partitionen über mehrere JVMs (Worker) per Messaging-Middleware.

RemotePartitioningConfig.javajava
// Remote-Partitioning-Konfiguration mit Messaging
@Configuration
public class RemotePartitioningConfig {

    @Bean
    public Step managerStep(JobRepository jobRepository,
                             Partitioner partitioner,
                             MessageChannelPartitionHandler partitionHandler) {
        return new StepBuilder("managerStep", jobRepository)
                .partitioner("workerStep", partitioner)
                // Handler kommuniziert mit Remote-Workern
                .partitionHandler(partitionHandler)
                .build();
    }

    // PartitionHandler sendet ExecutionContexts an Worker
    @Bean
    public MessageChannelPartitionHandler partitionHandler(
            MessagingTemplate messagingTemplate,
            JobExplorer jobExplorer) {
        MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
        handler.setStepName("workerStep");
        handler.setGridSize(4);
        handler.setMessagingOperations(messagingTemplate);
        handler.setJobExplorer(jobExplorer);
        // Timeout für die Wartezeit auf Worker
        handler.setPollInterval(5000L);
        return handler;
    }
}
WorkerConfiguration.javajava
// Konfiguration auf Worker-Seite
@Configuration
public class WorkerConfiguration {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public WorkerConfiguration(JobRepository jobRepository,
                                PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    // Worker empfängt Partitionen und führt den Step aus
    @Bean
    public Step workerStep(ItemReader<OrderRecord> reader,
                            ItemProcessor<OrderRecord, ProcessedOrder> processor,
                            ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("workerStep", jobRepository)
                .<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
                // Reader mit @StepScope für Partition-Parameter
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    // Reader, der Partitionsgrenzen verwendet
    @Bean
    @StepScope
    public JdbcCursorItemReader<OrderRecord> partitionedReader(
            DataSource dataSource,
            @Value("#{stepExecutionContext['minId']}") Long minId,
            @Value("#{stepExecutionContext['maxId']}") Long maxId) {
        return new JdbcCursorItemReaderBuilder<OrderRecord>()
                .name("partitionedOrderReader")
                .dataSource(dataSource)
                .sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
                .preparedStatementSetter(ps -> {
                    ps.setLong(1, minId);
                    ps.setLong(2, maxId);
                })
                .rowMapper(new OrderRecordRowMapper())
                .build();
    }
}

Bereit für deine Spring Boot-Interviews?

Übe mit unseren interaktiven Simulatoren, Flashcards und technischen Tests.

Fehlertoleranz und Fehlerbehandlung

Frage 7: Welche Fehlertoleranzmechanismen bietet Spring Batch?

Spring Batch bietet drei komplementäre Mechanismen: Skip (fehlerhafte Items überspringen), Retry (automatisch erneut versuchen) und Restart (fehlgeschlagenen Job fortsetzen). Diese Mechanismen werden auf Step-Ebene konfiguriert.

FaultTolerantStepConfig.javajava
// Vollständige Fehlertoleranz-Konfiguration
@Configuration
public class FaultTolerantStepConfig {

    @Bean
    public Step faultTolerantStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<DataRecord> reader,
                                   ItemProcessor<DataRecord, ProcessedRecord> processor,
                                   ItemWriter<ProcessedRecord> writer,
                                   SkipPolicy customSkipPolicy) {
        return new StepBuilder("faultTolerantStep", jobRepository)
                .<DataRecord, ProcessedRecord>chunk(100, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                // Aktiviert den fehlertoleranten Modus
                .faultTolerant()
                // SKIP: ignoriert bis zu 10 Validierungsfehler
                .skipLimit(10)
                .skip(ValidationException.class)
                .skip(DataIntegrityViolationException.class)
                // Manche Fehler dürfen nie übersprungen werden
                .noSkip(FatalBatchException.class)
                // RETRY: wiederholt vorübergehende Fehler
                .retryLimit(3)
                .retry(TransientDataAccessException.class)
                .retry(DeadlockLoserDataAccessException.class)
                // Exponentieller Backoff zwischen Retries
                .backOffPolicy(exponentialBackOffPolicy())
                // Listener zum Loggen der Skips
                .listener(skipListener())
                .build();
    }

    @Bean
    public BackOffPolicy exponentialBackOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(1000);  // 1 Sekunde
        policy.setMultiplier(2.0);         // Verdoppelt sich pro Retry
        policy.setMaxInterval(10000);      // Maximal 10 Sekunden
        return policy;
    }

    @Bean
    public SkipListener<DataRecord, ProcessedRecord> skipListener() {
        return new SkipListener<>() {
            @Override
            public void onSkipInRead(Throwable t) {
                // Loggt unlesbares Item
            }

            @Override
            public void onSkipInProcess(DataRecord item, Throwable t) {
                // Loggt Item, das bei der Verarbeitung fehlschlug
            }

            @Override
            public void onSkipInWrite(ProcessedRecord item, Throwable t) {
                // Loggt Item, das beim Schreiben fehlschlug
            }
        };
    }
}

Retry eignet sich für vorübergehende Fehler (Netzwerk-Timeout, DB-Deadlock). Skip eignet sich für individuelle Datenfehler, die die Gesamtverarbeitung nicht blockieren sollen.

Frage 8: Wie implementiert man eine eigene SkipPolicy?

Eine eigene SkipPolicy ermöglicht feingranulare Entscheidungslogik: Skip nach Exception-Typ, Fehleranzahl oder spezifischen Geschäftskriterien.

AdaptiveSkipPolicy.javajava
// SkipPolicy mit fortgeschrittener Geschäftslogik
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {

    private static final int MAX_SKIP_COUNT = 100;
    private static final double MAX_SKIP_PERCENTAGE = 0.05;  // Max 5%

    private final AtomicInteger totalProcessed = new AtomicInteger(0);
    private final AtomicInteger skipCount = new AtomicInteger(0);

    @Override
    public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
        // Fatale Fehler nie überspringen
        if (exception instanceof FatalBatchException
                || exception instanceof OutOfMemoryError) {
            return false;
        }

        // Absolutes Skip-Limit
        if (skipCountSoFar >= MAX_SKIP_COUNT) {
            return false;  // Job stoppen
        }

        // Prozentuales Limit
        int total = totalProcessed.get();
        if (total > 1000) {  // Erst nach Aufwärmphase anwenden
            double skipPercentage = (double) skipCountSoFar / total;
            if (skipPercentage > MAX_SKIP_PERCENTAGE) {
                return false;  // Anteilig zu viele Fehler
            }
        }

        // Validierungs- und Datenfehler überspringen
        return exception instanceof ValidationException
                || exception instanceof DataFormatException
                || exception instanceof IllegalArgumentException;
    }

    // Wird von einem Listener zur Fortschrittsverfolgung aufgerufen
    public void incrementProcessed() {
        totalProcessed.incrementAndGet();
    }
}

Frage 9: Wie funktioniert der Restart eines fehlgeschlagenen Jobs?

Das JobRepository speichert den Zustand jeder Ausführung. Beim Restart identifiziert Spring Batch den letzten committeten Chunk und nimmt von diesem Punkt an wieder auf. Erfolgreich verarbeitete Items werden nicht erneut verarbeitet.

JobRestartService.javajava
// Service zur Verwaltung von Job-Restarts
@Service
public class JobRestartService {

    private final JobLauncher jobLauncher;
    private final JobExplorer jobExplorer;
    private final JobRepository jobRepository;
    private final Job importJob;

    public JobRestartService(JobLauncher jobLauncher,
                              JobExplorer jobExplorer,
                              JobRepository jobRepository,
                              @Qualifier("importJob") Job importJob) {
        this.jobLauncher = jobLauncher;
        this.jobExplorer = jobExplorer;
        this.jobRepository = jobRepository;
        this.importJob = importJob;
    }

    public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
        // Holt die fehlgeschlagene Ausführung
        JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);

        if (failedExecution == null) {
            throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
        }

        // Prüft, ob der Job neu gestartet werden kann
        if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
            throw new IllegalStateException("Only FAILED jobs can be restarted");
        }

        // Verwendet die gleichen Parameter wie die ursprüngliche Ausführung
        JobParameters originalParams = failedExecution.getJobParameters();

        // Startet den Job neu - nimmt automatisch vom letzten Checkpoint wieder auf
        return jobLauncher.run(importJob, originalParams);
    }

    public List<JobExecution> findRestartableJobs() {
        // Listet alle FAILED-Ausführungen, die noch nicht neu gestartet wurden
        return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
                .stream()
                .flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
                .filter(exec -> exec.getStatus() == BatchStatus.FAILED)
                .filter(this::isRestartable)
                .toList();
    }

    private boolean isRestartable(JobExecution execution) {
        // Prüft, dass keine neuere erfolgreiche Ausführung existiert
        JobInstance instance = execution.getJobInstance();
        return jobExplorer.getJobExecutions(instance).stream()
                .noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
    }
}
Interview-Falle

Ein Job kann nur neu gestartet werden, wenn die JobParameters identisch sind. Eine Parameteränderung erzeugt eine neue Job-Instanz und löscht den Fortschrittsverlauf.

Skalierung und Optimierung

Frage 10: Welche Skalierungsstrategien stehen zur Verfügung?

Spring Batch bietet vier Strategien: Multi-threaded Step (mehrere Threads lesen parallel), Parallel Steps (unabhängige Steps parallel), Remote Chunking (verteilte Verarbeitung) und Partitioning (verteilte Daten).

MultiThreadedStepConfig.javajava
// Multi-threaded Step: mehrere Threads verarbeiten denselben Datensatz
@Configuration
public class MultiThreadedStepConfig {

    @Bean
    public Step multiThreadedStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<Record> reader,
                                   ItemProcessor<Record, ProcessedRecord> processor,
                                   ItemWriter<ProcessedRecord> writer,
                                   TaskExecutor taskExecutor) {
        return new StepBuilder("multiThreadedStep", jobRepository)
                .<Record, ProcessedRecord>chunk(100, transactionManager)
                // ACHTUNG: Reader muss thread-safe sein
                .reader(synchronizedReader(reader))
                .processor(processor)
                .writer(writer)
                // 4 Threads verarbeiten Chunks parallel
                .taskExecutor(taskExecutor)
                .throttleLimit(4)
                .build();
    }

    // Wrapper, um den Reader thread-safe zu machen
    private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
        SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
        syncReader.setDelegate((ItemStreamReader<Record>) reader);
        return syncReader;
    }
}
ParallelStepsConfig.javajava
// Ausführung unabhängiger Steps parallel
@Configuration
public class ParallelStepsConfig {

    @Bean
    public Job parallelJob(JobRepository jobRepository,
                            Step loadCustomersStep,
                            Step loadProductsStep,
                            Step loadOrdersStep,
                            Step processDataStep) {
        // Paralleler Flow: Customers und Products gleichzeitig geladen
        Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
                .start(loadCustomersStep)
                .build();

        Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
                .start(loadProductsStep)
                .build();

        Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
                .start(loadOrdersStep)
                .build();

        // Split führt die Flows parallel aus
        return new JobBuilder("parallelJob", jobRepository)
                .start(new FlowBuilder<Flow>("parallelLoadFlow")
                        .split(new SimpleAsyncTaskExecutor())
                        .add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
                        .build())
                // Nach parallelem Laden: sequentielle Verarbeitung
                .next(processDataStep)
                .build()
                .build();
    }
}

Multi-Threading eignet sich, wenn der Reader synchronisiert werden kann. Partitioning ist für große Volumina vorzuziehen, da jede Partition ihren eigenen Reader ohne Konkurrenz hat.

Frage 11: Wie überwacht man die Performance eines Jobs?

Spring Batch stellt Metriken über Listener und JobRepository bereit. Die Integration mit Micrometer ermöglicht den Export nach Prometheus, Grafana oder andere Monitoring-Systeme.

BatchMetricsConfig.javajava
// Monitoring-Konfiguration mit Micrometer
@Configuration
public class BatchMetricsConfig {

    private final MeterRegistry meterRegistry;

    public BatchMetricsConfig(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }

    @Bean
    public JobExecutionListener metricsJobListener() {
        return new JobExecutionListener() {

            private Timer.Sample jobTimer;

            @Override
            public void beforeJob(JobExecution jobExecution) {
                // Startet den Job-Dauer-Timer
                jobTimer = Timer.start(meterRegistry);
                Counter.builder("batch.job.started")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .register(meterRegistry)
                        .increment();
            }

            @Override
            public void afterJob(JobExecution jobExecution) {
                // Erfasst die Gesamtdauer
                jobTimer.stop(Timer.builder("batch.job.duration")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .tag("status", jobExecution.getStatus().toString())
                        .register(meterRegistry));

                // Job-Counter nach Status
                Counter.builder("batch.job.completed")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .tag("status", jobExecution.getStatus().toString())
                        .register(meterRegistry)
                        .increment();
            }
        };
    }

    @Bean
    public StepExecutionListener metricsStepListener() {
        return new StepExecutionListener() {

            @Override
            public void afterStep(StepExecution stepExecution) {
                String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
                String stepName = stepExecution.getStepName();

                // Throughput-Metriken
                Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                return null;
            }
        };
    }
}

Frage 12: Welche typischen Fallstricke gibt es beim Partitioning?

Häufige Fehler sind: unausgewogene Partitionen (eine Partition enthält 90% der Daten), nicht thread-safe Reader und falsche Statusverwaltung zwischen Partitionen.

BalancedPartitioner.javajava
// Partitioner, der die Last tatsächlich ausgleicht
@Component
public class BalancedPartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // Zählt die Gesamtanzahl der zu verarbeitenden Items
        Integer totalCount = jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);

        if (totalCount == null || totalCount == 0) {
            return Map.of();
        }

        // Berechnet die Zielgröße pro Partition
        int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);

        Map<String, ExecutionContext> partitions = new HashMap<>();

        // Verwendet OFFSET/LIMIT für ausgeglichene Partitionen
        // Teurer als Ranges, garantiert aber den Ausgleich
        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.putInt("offset", i * itemsPerPartition);
            context.putInt("limit", itemsPerPartition);
            context.putInt("partitionNumber", i);

            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

// OffsetBasedReader.java
// Reader kompatibel mit Offset-basiertem Partitioning
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {

    private final JdbcTemplate jdbcTemplate;
    private Iterator<OrderRecord> iterator;

    @Value("#{stepExecutionContext['offset']}")
    private int offset;

    @Value("#{stepExecutionContext['limit']}")
    private int limit;

    public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void open(ExecutionContext executionContext) {
        // Lädt genau den dieser Partition zugewiesenen Anteil
        List<OrderRecord> records = jdbcTemplate.query(
                "SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
                new OrderRecordRowMapper(),
                limit, offset
        );
        this.iterator = records.iterator();
    }

    @Override
    public OrderRecord read() {
        return iterator.hasNext() ? iterator.next() : null;
    }

    @Override
    public void update(ExecutionContext executionContext) {
        // Statusspeicherung für Restart, falls nötig
    }

    @Override
    public void close() {
        // Aufräumen
    }
}

Fortgeschrittene Fragen für Senioren

Frage 13: Wie verwaltet man Abhängigkeiten zwischen Jobs?

Spring Batch verwaltet Abhängigkeiten zwischen Jobs nicht nativ. Lösungen sind: externe Orchestratoren (Airflow, Kubernetes CronJob) oder eigene Implementierung mit JobExplorer.

JobDependencyService.javajava
// Verwaltung von Job-Abhängigkeiten
@Service
public class JobDependencyService {

    private final JobExplorer jobExplorer;
    private final JobLauncher jobLauncher;
    private final Map<String, Job> jobs;

    public JobDependencyService(JobExplorer jobExplorer,
                                  JobLauncher jobLauncher,
                                  Map<String, Job> jobs) {
        this.jobExplorer = jobExplorer;
        this.jobLauncher = jobLauncher;
        this.jobs = jobs;
    }

    public JobExecution runWithDependencies(String jobName,
                                             JobParameters params,
                                             List<String> dependsOn) throws Exception {
        // Prüft, dass alle Abhängigkeiten erfolgreich waren
        for (String dependency : dependsOn) {
            if (!hasSuccessfulExecution(dependency, params)) {
                throw new JobExecutionException(
                        "Dependency not satisfied: " + dependency);
            }
        }

        Job job = jobs.get(jobName);
        if (job == null) {
            throw new IllegalArgumentException("Unknown job: " + jobName);
        }

        return jobLauncher.run(job, params);
    }

    private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
        // Sucht eine COMPLETED-Ausführung mit denselben Geschäftsparametern
        return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
                .stream()
                .flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
                .filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
                .anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
    }

    private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
        // Vergleicht Geschäftsparameter (ignoriert Ausführungs-Timestamps)
        String actualDate = actual.getString("businessDate");
        String expectedDate = expected.getString("businessDate");
        return Objects.equals(actualDate, expectedDate);
    }
}

Frage 14: Wie testet man einen Spring Batch Job effektiv?

Das Testen von Spring Batch Jobs erfordert einen mehrschichtigen Ansatz: Unit-Tests für Komponenten (Reader, Processor, Writer), Integrationstests für Steps und End-to-End-Tests für vollständige Jobs.

OrderProcessorTest.javajava
// Unit-Test des Processors
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {

    @Mock
    private PricingService pricingService;

    @Mock
    private ValidationService validationService;

    @InjectMocks
    private OrderItemProcessor processor;

    @Test
    void shouldProcessValidOrder() {
        // Given
        OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
        when(validationService.isValid(input)).thenReturn(true);
        when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));

        // When
        ProcessedOrder result = processor.process(input);

        // Then
        assertThat(result).isNotNull();
        assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
    }

    @Test
    void shouldFilterInvalidOrder() {
        // Given
        OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
        when(validationService.isValid(input)).thenReturn(false);

        // When
        ProcessedOrder result = processor.process(input);

        // Then - null bedeutet gefiltert
        assertThat(result).isNull();
        verify(pricingService, never()).calculatePrice(any());
    }
}
ImportJobIntegrationTest.javajava
// Integrationstest des vollständigen Jobs
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JobRepositoryTestUtils jobRepositoryTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @BeforeEach
    void setup() {
        // Säubert Metadaten zwischen Tests
        jobRepositoryTestUtils.removeJobExecutions();
        // Setzt Testdaten zurück
        jdbcTemplate.execute("DELETE FROM processed_orders");
        jdbcTemplate.execute("DELETE FROM orders");
    }

    @Test
    void shouldCompleteJobSuccessfully() throws Exception {
        // Given - Testdaten
        insertTestOrders(100);

        // When
        JobParameters params = new JobParametersBuilder()
                .addLocalDate("businessDate", LocalDate.now())
                .addLong("run.id", System.currentTimeMillis())
                .toJobParameters();

        JobExecution execution = jobLauncherTestUtils.launchJob(params);

        // Then
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(countProcessedOrders()).isEqualTo(100);
    }

    @Test
    void shouldHandleEmptyDataset() throws Exception {
        // Given - keine Daten

        // When
        JobExecution execution = jobLauncherTestUtils.launchJob();

        // Then - Job ist auch ohne Daten erfolgreich
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }

    @Test
    void shouldRestartFromFailurePoint() throws Exception {
        // Given - simuliert Fehler mitten in der Verarbeitung
        insertTestOrders(100);
        insertPoisonOrder(50);  // Verursacht einen Fehler

        // When - erste Ausführung schlägt fehl
        JobExecution firstExecution = jobLauncherTestUtils.launchJob();
        assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);

        // Daten korrigieren
        removePoisonOrder(50);

        // When - Restart
        JobExecution restartExecution = jobLauncherTestUtils.launchJob(
                firstExecution.getJobParameters());

        // Then - nimmt vom Fehlerpunkt wieder auf
        assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }

    private void insertTestOrders(int count) {
        for (int i = 1; i <= count; i++) {
            jdbcTemplate.update(
                    "INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
                    i, i * 10, BigDecimal.valueOf(i * 10));
        }
    }

    private int countProcessedOrders() {
        return jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM processed_orders", Integer.class);
    }
}

Frage 15: Wie optimiert man die Datenbank-Schreibperformance?

Das Schreiben wird oft zum Engpass. Optimierungen sind: JDBC Batch Inserts, Deaktivierung von Constraints während des Ladens und Verwendung von Staging-Tabellen.

OptimizedJdbcWriter.javajava
// Writer optimiert für hohe Volumina
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;
    private final DataSource dataSource;

    public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
        this.jdbcTemplate = jdbcTemplate;
        this.dataSource = dataSource;
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
        List<? extends ProcessedOrder> items = chunk.getItems();

        if (items.isEmpty()) {
            return;
        }

        // Verwendet PreparedStatement mit Batch
        try (Connection connection = dataSource.getConnection();
             PreparedStatement ps = connection.prepareStatement(
                     "INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
                             "VALUES (?, ?, ?, ?)")) {

            for (ProcessedOrder order : items) {
                ps.setLong(1, order.orderId());
                ps.setLong(2, order.customerId());
                ps.setBigDecimal(3, order.finalPrice());
                ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                ps.addBatch();
            }

            // Führt alle Inserts in einer einzigen Netzwerk-Operation aus
            ps.executeBatch();
        }
    }
}

// StagingTableWriter.java
// Staging-Table-Pattern für sehr hohe Volumina
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {

    private final JdbcTemplate jdbcTemplate;
    private String stagingTable;

    public StagingTableWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        // Erstellt eine temporäre Tabelle für diesen Step
        stagingTable = "staging_orders_" + stepExecution.getId();
        jdbcTemplate.execute(
                "CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) {
        // Schreibt in die Staging-Tabelle (ohne FK-Constraints)
        String sql = "INSERT INTO " + stagingTable +
                " (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";

        jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
                (ps, order) -> {
                    ps.setLong(1, order.orderId());
                    ps.setLong(2, order.customerId());
                    ps.setBigDecimal(3, order.finalPrice());
                    ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                });
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
            // Bulk Copy in die finale Tabelle
            jdbcTemplate.execute(
                    "INSERT INTO processed_orders SELECT * FROM " + stagingTable);
        }
        // Säubert die Staging-Tabelle
        jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
        return stepExecution.getExitStatus();
    }
}

Fazit

Die Beherrschung von Spring Batch 5 in technischen Interviews beruht auf einem tiefen Verständnis der internen Mechanismen:

Architektur: Job → Step → Chunk (Reader, Processor, Writer)

Chunk-Verarbeitung: Dimensionierung, Lebenszyklus, Transaktionen

Partitioning: lokal vs. remote, Partition-Balancing

Fehlertoleranz: Skip, Retry, Restart mit passender Policy

Skalierung: Multi-Threading, Parallel Steps, Remote Chunking

Tests: Unit, Integration, End-to-End

Optimierung: Batch Writes, Staging Tables, Monitoring

Fortgeschrittene Fragen prüfen die Fähigkeit, Architekturentscheidungen kontextabhängig zu rechtfertigen: Datenvolumen, Zeitvorgaben, Fehlertoleranz und verfügbare Infrastruktur.

Fang an zu üben!

Teste dein Wissen mit unseren Interview-Simulatoren und technischen Tests.

Tags

#spring batch
#spring boot
#java
#batch processing
#interview questions

Teilen

Verwandte Artikel