Spring Batch 5 Interview: Partitioning, Chunks und Fehlertoleranz
Meistern Sie Spring Batch 5 Interviews: 15 essenzielle Fragen zu Partitioning, Chunk-Verarbeitung und Fehlertoleranz mit Java 21 Codebeispielen.

Spring Batch 5 ist ein zentraler Baustein für die Verarbeitung großer Datenmengen im Spring-Ökosystem. Technische Interviews bewerten die Fähigkeit, robuste, skalierbare und fehlertolerante Jobs zu entwerfen. Die Beherrschung von Partitioning, Chunk-Oriented Processing und Fehlertoleranzmechanismen unterscheidet erfahrene Entwickler.
Recruiter prüfen tiefes Verständnis: Warum Partitioning statt Remote Chunking? Wie dimensioniert man Chunks korrekt? Solche Architekturentscheidungen offenbaren echte Produktionserfahrung.
Grundarchitektur von Spring Batch 5
Frage 1: Was sind die Hauptkomponenten von Spring Batch?
Die Spring Batch Architektur basiert auf drei Schichten: Application (Jobs und Geschäftslogik), Batch Core (Runtime-Klassen zum Starten und Steuern von Jobs) und Infrastructure (gemeinsame Reader, Writer und Services wie RetryTemplate).
// Spring Batch 5 Job-Konfiguration mit Java 21
@Configuration
public class BatchJobConfig {
// JobRepository speichert die Ausführungs-Metadaten
// Ermöglicht Restart und Job-Monitoring
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public BatchJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Ein Job kapselt den vollständigen Batch-Prozess
// Besteht aus einem oder mehreren sequentiell ausgeführten Steps
@Bean
public Job importUserJob(Step processUsersStep, Step cleanupStep) {
return new JobBuilder("importUserJob", jobRepository)
.start(processUsersStep) // Haupt-Verarbeitungs-Step
.next(cleanupStep) // Aufräum-Step
.build();
}
// Ein Step repräsentiert eine unabhängige Arbeitseinheit
// Zwei Modelle: Tasklet (einzelne Aufgabe) oder Chunk (iterative Verarbeitung)
@Bean
public Step processUsersStep(ItemReader<UserRecord> reader,
ItemProcessor<UserRecord, User> processor,
ItemWriter<User> writer) {
return new StepBuilder("processUsersStep", jobRepository)
.<UserRecord, User>chunk(100, transactionManager) // Commit alle 100 Items
.reader(reader) // Liest Quelldaten
.processor(processor) // Transformiert jedes Item
.writer(writer) // Schreibt in 100er-Batches
.build();
}
}Das JobRepository persistiert den Ausführungsstatus in der Datenbank. Diese Persistenz ermöglicht es, einen fehlgeschlagenen Job genau dort fortzusetzen, wo er gestoppt wurde, ohne bereits committete Daten erneut zu verarbeiten.
Frage 2: Was ist der Unterschied zwischen Tasklet und Chunk-orientierter Verarbeitung?
Tasklet führt eine diskrete, nicht-iterative Aktion aus: Datei löschen, Stored Procedure aufrufen, Benachrichtigungs-E-Mail senden. Chunk verarbeitet große Volumina, indem die Daten in handhabbare Batches aufgeteilt werden.
// Tasklet: einzelne Aktion ohne Iteration
@Component
public class CleanupTasklet implements Tasklet {
private final Path tempDirectory = Path.of("/tmp/batch-work");
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// Löscht alle temporären Dateien der Verarbeitung
try (var files = Files.walk(tempDirectory)) {
files.filter(Files::isRegularFile)
.forEach(this::deleteQuietly);
}
// FINISHED zeigt an, dass das Tasklet seine Arbeit abgeschlossen hat
// CONTINUABLE würde die Ausführung neu starten (nützlich für Polling)
return RepeatStatus.FINISHED;
}
private void deleteQuietly(Path file) {
try {
Files.delete(file);
} catch (IOException e) {
// Loggen und fortfahren - Job nicht wegen einer Datei abbrechen
}
}
}// Chunk-Verarbeitung: Hochvolumige Verarbeitung
@Configuration
public class ChunkProcessingConfig {
@Bean
public Step processOrdersStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("processOrdersStep", jobRepository)
// Chunk von 500: liest 500 Items, verarbeitet, schreibt, committet
.<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Listener zur Fortschrittsüberwachung
.listener(new ChunkProgressListener())
.build();
}
}Chunk-orientierte Verarbeitung bietet entscheidende Vorteile: optimiertes Speichermanagement (nur der aktuelle Chunk im Speicher), granulare Transaktionen (Commit pro Chunk) und Fehlerwiederherstellung beim letzten committeten Chunk.
Vertiefung der Chunk-orientierten Verarbeitung
Frage 3: Wie funktioniert der Chunk-Lebenszyklus?
Jeder Chunk folgt einem präzisen Zyklus: Lesen der Items einzeln bis zur konfigurierten Größe, individuelle Verarbeitung jedes Items, dann Schreiben der Gruppe. Eine Transaktion umschließt den gesamten Chunk.
// ItemReader: liest ein Item nach dem anderen
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {
// @StepScope: neue Instanz pro Step-Ausführung
// Ermöglicht das Injizieren dynamischer Job-Parameter
@Value("#{jobParameters['startDate']}")
private LocalDate startDate;
private Iterator<OrderRecord> orderIterator;
@BeforeStep
public void initializeReader(StepExecution stepExecution) {
// Lädt Daten beim Start des Steps
List<OrderRecord> orders = fetchOrdersFromDate(startDate);
this.orderIterator = orders.iterator();
}
@Override
public OrderRecord read() {
// Gibt null zurück, um das Datenende zu signalisieren
// Spring Batch ruft read() auf, bis null zurückkommt
if (orderIterator.hasNext()) {
return orderIterator.next();
}
return null; // Ende des Datasets
}
private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
// Holt Daten aus der Datenquelle
return List.of(); // Tatsächliche Implementierung
}
}// ItemProcessor: transformiert jedes Item einzeln
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {
private final PricingService pricingService;
private final ValidationService validationService;
public OrderItemProcessor(PricingService pricingService,
ValidationService validationService) {
this.pricingService = pricingService;
this.validationService = validationService;
}
@Override
public ProcessedOrder process(OrderRecord item) {
// null zurückgeben filtert das Item (wird nicht geschrieben)
if (!validationService.isValid(item)) {
return null; // Item gefiltert
}
// Geschäftliche Transformation
BigDecimal finalPrice = pricingService.calculatePrice(item);
return new ProcessedOrder(
item.orderId(),
item.customerId(),
finalPrice,
LocalDateTime.now()
);
}
}// ItemWriter: schreibt den vollständigen Chunk in einer Operation
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
public OrderItemWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// Der Chunk enthält alle verarbeiteten Items
// Batch-Schreiben für optimierte Performance
List<? extends ProcessedOrder> items = chunk.getItems();
jdbcTemplate.batchUpdate(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
items,
items.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
}
);
}
}Tritt während der Chunk-Verarbeitung eine Exception auf, wird die Transaktion zurückgerollt. Der Job kann dann von diesem Chunk aus fortgesetzt werden, indem die im JobRepository gespeicherten Metadaten verwendet werden.
Frage 4: Wie wählt man die optimale Chunk-Größe?
Die Chunk-Größe beeinflusst Performance und Speicherverbrauch direkt. Ein zu kleiner Chunk multipliziert die Commits (Overhead). Ein zu großer Chunk verbraucht zu viel Speicher und verlängert Rollbacks bei Fehlern.
// Dynamische Konfiguration der Chunk-Größe
@Configuration
public class ChunkSizingConfig {
// Sinnvoller Standardwert für die meisten Fälle
private static final int DEFAULT_CHUNK_SIZE = 100;
// Für leichte Items (wenige Felder)
private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;
// Für schwere Items (Blobs, Dokumente)
private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;
@Bean
public Step processLightDataStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<LightRecord> reader,
ItemWriter<LightRecord> writer) {
return new StepBuilder("processLightDataStep", jobRepository)
// Leichte Items: größere Chunks für weniger Commits
.<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Step processDocumentsStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<Document> reader,
ItemProcessor<Document, ProcessedDocument> processor,
ItemWriter<ProcessedDocument> writer) {
return new StepBuilder("processDocumentsStep", jobRepository)
// Schwere Dokumente: kleinere Chunks zur Speicherbegrenzung
.<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}Mit 100 Items pro Chunk beginnen, dann anhand von Metriken anpassen: Commit-Zeit, Speichernutzung und Rollback-Dauer. Listener nutzen, um den optimalen Wert zu identifizieren.
Partitioning für parallele Verarbeitung
Frage 5: Was ist Partitioning und wann sollte man es einsetzen?
Partitioning teilt einen Datensatz in unabhängige, parallel verarbeitete Partitionen auf. Jede Partition läuft in einem eigenen Thread (lokal) oder auf einem Remote-Worker. Dieser Ansatz vervielfacht den Durchsatz, ohne die Restart-Fähigkeit zu opfern.
// Konfiguration eines partitionierten Jobs
@Configuration
public class PartitionedJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public PartitionedJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
@Bean
public Job partitionedImportJob(Step partitionedStep) {
return new JobBuilder("partitionedImportJob", jobRepository)
.start(partitionedStep)
.build();
}
// Manager-Step: orchestriert die Partitionen
@Bean
public Step partitionedStep(Partitioner partitioner,
Step workerStep,
TaskExecutor taskExecutor) {
return new StepBuilder("partitionedStep", jobRepository)
// Teilt die Arbeit über den Partitioner
.partitioner("workerStep", partitioner)
// Step, der für jede Partition ausgeführt wird
.step(workerStep)
// 8 parallele Threads
.taskExecutor(taskExecutor)
// Anzahl zu erstellender Partitionen
.gridSize(8)
.build();
}
// TaskExecutor für parallele Ausführung
@Bean
public TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("batch-partition-");
executor.initialize();
return executor;
}
}// Partitioner basierend auf ID-Bereichen
@Component
public class RangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public RangePartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Ermittelt die Grenzen des Datasets
Long minId = jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
Long maxId = jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);
if (minId == null || maxId == null) {
return Map.of(); // Keine Daten zu verarbeiten
}
// Berechnet die Größe jeder Partition
long range = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
long start = minId + (i * range);
long end = Math.min(start + range - 1, maxId);
// Jede Partition erhält ihre Grenzen
context.putLong("minId", start);
context.putLong("maxId", end);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}Partitioning eignet sich für große Datasets, bei denen Items unabhängig sind. Partitionen müssen ausbalanciert sein, um zu verhindern, dass eine langsame Partition den gesamten Job ausbremst.
Frage 6: Worin unterscheiden sich lokales und Remote-Partitioning?
Lokales Partitioning führt alle Partitionen in derselben JVM mit einem Thread-Pool aus. Remote-Partitioning verteilt die Partitionen über mehrere JVMs (Worker) per Messaging-Middleware.
// Remote-Partitioning-Konfiguration mit Messaging
@Configuration
public class RemotePartitioningConfig {
@Bean
public Step managerStep(JobRepository jobRepository,
Partitioner partitioner,
MessageChannelPartitionHandler partitionHandler) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", partitioner)
// Handler kommuniziert mit Remote-Workern
.partitionHandler(partitionHandler)
.build();
}
// PartitionHandler sendet ExecutionContexts an Worker
@Bean
public MessageChannelPartitionHandler partitionHandler(
MessagingTemplate messagingTemplate,
JobExplorer jobExplorer) {
MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
handler.setStepName("workerStep");
handler.setGridSize(4);
handler.setMessagingOperations(messagingTemplate);
handler.setJobExplorer(jobExplorer);
// Timeout für die Wartezeit auf Worker
handler.setPollInterval(5000L);
return handler;
}
}// Konfiguration auf Worker-Seite
@Configuration
public class WorkerConfiguration {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public WorkerConfiguration(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Worker empfängt Partitionen und führt den Step aus
@Bean
public Step workerStep(ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("workerStep", jobRepository)
.<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
// Reader mit @StepScope für Partition-Parameter
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
// Reader, der Partitionsgrenzen verwendet
@Bean
@StepScope
public JdbcCursorItemReader<OrderRecord> partitionedReader(
DataSource dataSource,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
return new JdbcCursorItemReaderBuilder<OrderRecord>()
.name("partitionedOrderReader")
.dataSource(dataSource)
.sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
.preparedStatementSetter(ps -> {
ps.setLong(1, minId);
ps.setLong(2, maxId);
})
.rowMapper(new OrderRecordRowMapper())
.build();
}
}Bereit für deine Spring Boot-Interviews?
Übe mit unseren interaktiven Simulatoren, Flashcards und technischen Tests.
Fehlertoleranz und Fehlerbehandlung
Frage 7: Welche Fehlertoleranzmechanismen bietet Spring Batch?
Spring Batch bietet drei komplementäre Mechanismen: Skip (fehlerhafte Items überspringen), Retry (automatisch erneut versuchen) und Restart (fehlgeschlagenen Job fortsetzen). Diese Mechanismen werden auf Step-Ebene konfiguriert.
// Vollständige Fehlertoleranz-Konfiguration
@Configuration
public class FaultTolerantStepConfig {
@Bean
public Step faultTolerantStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<DataRecord> reader,
ItemProcessor<DataRecord, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
SkipPolicy customSkipPolicy) {
return new StepBuilder("faultTolerantStep", jobRepository)
.<DataRecord, ProcessedRecord>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Aktiviert den fehlertoleranten Modus
.faultTolerant()
// SKIP: ignoriert bis zu 10 Validierungsfehler
.skipLimit(10)
.skip(ValidationException.class)
.skip(DataIntegrityViolationException.class)
// Manche Fehler dürfen nie übersprungen werden
.noSkip(FatalBatchException.class)
// RETRY: wiederholt vorübergehende Fehler
.retryLimit(3)
.retry(TransientDataAccessException.class)
.retry(DeadlockLoserDataAccessException.class)
// Exponentieller Backoff zwischen Retries
.backOffPolicy(exponentialBackOffPolicy())
// Listener zum Loggen der Skips
.listener(skipListener())
.build();
}
@Bean
public BackOffPolicy exponentialBackOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(1000); // 1 Sekunde
policy.setMultiplier(2.0); // Verdoppelt sich pro Retry
policy.setMaxInterval(10000); // Maximal 10 Sekunden
return policy;
}
@Bean
public SkipListener<DataRecord, ProcessedRecord> skipListener() {
return new SkipListener<>() {
@Override
public void onSkipInRead(Throwable t) {
// Loggt unlesbares Item
}
@Override
public void onSkipInProcess(DataRecord item, Throwable t) {
// Loggt Item, das bei der Verarbeitung fehlschlug
}
@Override
public void onSkipInWrite(ProcessedRecord item, Throwable t) {
// Loggt Item, das beim Schreiben fehlschlug
}
};
}
}Retry eignet sich für vorübergehende Fehler (Netzwerk-Timeout, DB-Deadlock). Skip eignet sich für individuelle Datenfehler, die die Gesamtverarbeitung nicht blockieren sollen.
Frage 8: Wie implementiert man eine eigene SkipPolicy?
Eine eigene SkipPolicy ermöglicht feingranulare Entscheidungslogik: Skip nach Exception-Typ, Fehleranzahl oder spezifischen Geschäftskriterien.
// SkipPolicy mit fortgeschrittener Geschäftslogik
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {
private static final int MAX_SKIP_COUNT = 100;
private static final double MAX_SKIP_PERCENTAGE = 0.05; // Max 5%
private final AtomicInteger totalProcessed = new AtomicInteger(0);
private final AtomicInteger skipCount = new AtomicInteger(0);
@Override
public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
// Fatale Fehler nie überspringen
if (exception instanceof FatalBatchException
|| exception instanceof OutOfMemoryError) {
return false;
}
// Absolutes Skip-Limit
if (skipCountSoFar >= MAX_SKIP_COUNT) {
return false; // Job stoppen
}
// Prozentuales Limit
int total = totalProcessed.get();
if (total > 1000) { // Erst nach Aufwärmphase anwenden
double skipPercentage = (double) skipCountSoFar / total;
if (skipPercentage > MAX_SKIP_PERCENTAGE) {
return false; // Anteilig zu viele Fehler
}
}
// Validierungs- und Datenfehler überspringen
return exception instanceof ValidationException
|| exception instanceof DataFormatException
|| exception instanceof IllegalArgumentException;
}
// Wird von einem Listener zur Fortschrittsverfolgung aufgerufen
public void incrementProcessed() {
totalProcessed.incrementAndGet();
}
}Frage 9: Wie funktioniert der Restart eines fehlgeschlagenen Jobs?
Das JobRepository speichert den Zustand jeder Ausführung. Beim Restart identifiziert Spring Batch den letzten committeten Chunk und nimmt von diesem Punkt an wieder auf. Erfolgreich verarbeitete Items werden nicht erneut verarbeitet.
// Service zur Verwaltung von Job-Restarts
@Service
public class JobRestartService {
private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
private final Job importJob;
public JobRestartService(JobLauncher jobLauncher,
JobExplorer jobExplorer,
JobRepository jobRepository,
@Qualifier("importJob") Job importJob) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
this.importJob = importJob;
}
public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
// Holt die fehlgeschlagene Ausführung
JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);
if (failedExecution == null) {
throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
}
// Prüft, ob der Job neu gestartet werden kann
if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
throw new IllegalStateException("Only FAILED jobs can be restarted");
}
// Verwendet die gleichen Parameter wie die ursprüngliche Ausführung
JobParameters originalParams = failedExecution.getJobParameters();
// Startet den Job neu - nimmt automatisch vom letzten Checkpoint wieder auf
return jobLauncher.run(importJob, originalParams);
}
public List<JobExecution> findRestartableJobs() {
// Listet alle FAILED-Ausführungen, die noch nicht neu gestartet wurden
return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.FAILED)
.filter(this::isRestartable)
.toList();
}
private boolean isRestartable(JobExecution execution) {
// Prüft, dass keine neuere erfolgreiche Ausführung existiert
JobInstance instance = execution.getJobInstance();
return jobExplorer.getJobExecutions(instance).stream()
.noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
}
}Ein Job kann nur neu gestartet werden, wenn die JobParameters identisch sind. Eine Parameteränderung erzeugt eine neue Job-Instanz und löscht den Fortschrittsverlauf.
Skalierung und Optimierung
Frage 10: Welche Skalierungsstrategien stehen zur Verfügung?
Spring Batch bietet vier Strategien: Multi-threaded Step (mehrere Threads lesen parallel), Parallel Steps (unabhängige Steps parallel), Remote Chunking (verteilte Verarbeitung) und Partitioning (verteilte Daten).
// Multi-threaded Step: mehrere Threads verarbeiten denselben Datensatz
@Configuration
public class MultiThreadedStepConfig {
@Bean
public Step multiThreadedStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<Record> reader,
ItemProcessor<Record, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
TaskExecutor taskExecutor) {
return new StepBuilder("multiThreadedStep", jobRepository)
.<Record, ProcessedRecord>chunk(100, transactionManager)
// ACHTUNG: Reader muss thread-safe sein
.reader(synchronizedReader(reader))
.processor(processor)
.writer(writer)
// 4 Threads verarbeiten Chunks parallel
.taskExecutor(taskExecutor)
.throttleLimit(4)
.build();
}
// Wrapper, um den Reader thread-safe zu machen
private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
syncReader.setDelegate((ItemStreamReader<Record>) reader);
return syncReader;
}
}// Ausführung unabhängiger Steps parallel
@Configuration
public class ParallelStepsConfig {
@Bean
public Job parallelJob(JobRepository jobRepository,
Step loadCustomersStep,
Step loadProductsStep,
Step loadOrdersStep,
Step processDataStep) {
// Paralleler Flow: Customers und Products gleichzeitig geladen
Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
.start(loadCustomersStep)
.build();
Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
.start(loadProductsStep)
.build();
Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
.start(loadOrdersStep)
.build();
// Split führt die Flows parallel aus
return new JobBuilder("parallelJob", jobRepository)
.start(new FlowBuilder<Flow>("parallelLoadFlow")
.split(new SimpleAsyncTaskExecutor())
.add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
.build())
// Nach parallelem Laden: sequentielle Verarbeitung
.next(processDataStep)
.build()
.build();
}
}Multi-Threading eignet sich, wenn der Reader synchronisiert werden kann. Partitioning ist für große Volumina vorzuziehen, da jede Partition ihren eigenen Reader ohne Konkurrenz hat.
Frage 11: Wie überwacht man die Performance eines Jobs?
Spring Batch stellt Metriken über Listener und JobRepository bereit. Die Integration mit Micrometer ermöglicht den Export nach Prometheus, Grafana oder andere Monitoring-Systeme.
// Monitoring-Konfiguration mit Micrometer
@Configuration
public class BatchMetricsConfig {
private final MeterRegistry meterRegistry;
public BatchMetricsConfig(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Bean
public JobExecutionListener metricsJobListener() {
return new JobExecutionListener() {
private Timer.Sample jobTimer;
@Override
public void beforeJob(JobExecution jobExecution) {
// Startet den Job-Dauer-Timer
jobTimer = Timer.start(meterRegistry);
Counter.builder("batch.job.started")
.tag("job", jobExecution.getJobInstance().getJobName())
.register(meterRegistry)
.increment();
}
@Override
public void afterJob(JobExecution jobExecution) {
// Erfasst die Gesamtdauer
jobTimer.stop(Timer.builder("batch.job.duration")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry));
// Job-Counter nach Status
Counter.builder("batch.job.completed")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry)
.increment();
}
};
}
@Bean
public StepExecutionListener metricsStepListener() {
return new StepExecutionListener() {
@Override
public void afterStep(StepExecution stepExecution) {
String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
String stepName = stepExecution.getStepName();
// Throughput-Metriken
Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
return null;
}
};
}
}Frage 12: Welche typischen Fallstricke gibt es beim Partitioning?
Häufige Fehler sind: unausgewogene Partitionen (eine Partition enthält 90% der Daten), nicht thread-safe Reader und falsche Statusverwaltung zwischen Partitionen.
// Partitioner, der die Last tatsächlich ausgleicht
@Component
public class BalancedPartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Zählt die Gesamtanzahl der zu verarbeitenden Items
Integer totalCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);
if (totalCount == null || totalCount == 0) {
return Map.of();
}
// Berechnet die Zielgröße pro Partition
int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);
Map<String, ExecutionContext> partitions = new HashMap<>();
// Verwendet OFFSET/LIMIT für ausgeglichene Partitionen
// Teurer als Ranges, garantiert aber den Ausgleich
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("offset", i * itemsPerPartition);
context.putInt("limit", itemsPerPartition);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}
// OffsetBasedReader.java
// Reader kompatibel mit Offset-basiertem Partitioning
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {
private final JdbcTemplate jdbcTemplate;
private Iterator<OrderRecord> iterator;
@Value("#{stepExecutionContext['offset']}")
private int offset;
@Value("#{stepExecutionContext['limit']}")
private int limit;
public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void open(ExecutionContext executionContext) {
// Lädt genau den dieser Partition zugewiesenen Anteil
List<OrderRecord> records = jdbcTemplate.query(
"SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
new OrderRecordRowMapper(),
limit, offset
);
this.iterator = records.iterator();
}
@Override
public OrderRecord read() {
return iterator.hasNext() ? iterator.next() : null;
}
@Override
public void update(ExecutionContext executionContext) {
// Statusspeicherung für Restart, falls nötig
}
@Override
public void close() {
// Aufräumen
}
}Fortgeschrittene Fragen für Senioren
Frage 13: Wie verwaltet man Abhängigkeiten zwischen Jobs?
Spring Batch verwaltet Abhängigkeiten zwischen Jobs nicht nativ. Lösungen sind: externe Orchestratoren (Airflow, Kubernetes CronJob) oder eigene Implementierung mit JobExplorer.
// Verwaltung von Job-Abhängigkeiten
@Service
public class JobDependencyService {
private final JobExplorer jobExplorer;
private final JobLauncher jobLauncher;
private final Map<String, Job> jobs;
public JobDependencyService(JobExplorer jobExplorer,
JobLauncher jobLauncher,
Map<String, Job> jobs) {
this.jobExplorer = jobExplorer;
this.jobLauncher = jobLauncher;
this.jobs = jobs;
}
public JobExecution runWithDependencies(String jobName,
JobParameters params,
List<String> dependsOn) throws Exception {
// Prüft, dass alle Abhängigkeiten erfolgreich waren
for (String dependency : dependsOn) {
if (!hasSuccessfulExecution(dependency, params)) {
throw new JobExecutionException(
"Dependency not satisfied: " + dependency);
}
}
Job job = jobs.get(jobName);
if (job == null) {
throw new IllegalArgumentException("Unknown job: " + jobName);
}
return jobLauncher.run(job, params);
}
private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
// Sucht eine COMPLETED-Ausführung mit denselben Geschäftsparametern
return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
.anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
}
private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
// Vergleicht Geschäftsparameter (ignoriert Ausführungs-Timestamps)
String actualDate = actual.getString("businessDate");
String expectedDate = expected.getString("businessDate");
return Objects.equals(actualDate, expectedDate);
}
}Frage 14: Wie testet man einen Spring Batch Job effektiv?
Das Testen von Spring Batch Jobs erfordert einen mehrschichtigen Ansatz: Unit-Tests für Komponenten (Reader, Processor, Writer), Integrationstests für Steps und End-to-End-Tests für vollständige Jobs.
// Unit-Test des Processors
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {
@Mock
private PricingService pricingService;
@Mock
private ValidationService validationService;
@InjectMocks
private OrderItemProcessor processor;
@Test
void shouldProcessValidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(true);
when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));
// When
ProcessedOrder result = processor.process(input);
// Then
assertThat(result).isNotNull();
assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
}
@Test
void shouldFilterInvalidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(false);
// When
ProcessedOrder result = processor.process(input);
// Then - null bedeutet gefiltert
assertThat(result).isNull();
verify(pricingService, never()).calculatePrice(any());
}
}// Integrationstest des vollständigen Jobs
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JobRepositoryTestUtils jobRepositoryTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@BeforeEach
void setup() {
// Säubert Metadaten zwischen Tests
jobRepositoryTestUtils.removeJobExecutions();
// Setzt Testdaten zurück
jdbcTemplate.execute("DELETE FROM processed_orders");
jdbcTemplate.execute("DELETE FROM orders");
}
@Test
void shouldCompleteJobSuccessfully() throws Exception {
// Given - Testdaten
insertTestOrders(100);
// When
JobParameters params = new JobParametersBuilder()
.addLocalDate("businessDate", LocalDate.now())
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncherTestUtils.launchJob(params);
// Then
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
assertThat(countProcessedOrders()).isEqualTo(100);
}
@Test
void shouldHandleEmptyDataset() throws Exception {
// Given - keine Daten
// When
JobExecution execution = jobLauncherTestUtils.launchJob();
// Then - Job ist auch ohne Daten erfolgreich
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
@Test
void shouldRestartFromFailurePoint() throws Exception {
// Given - simuliert Fehler mitten in der Verarbeitung
insertTestOrders(100);
insertPoisonOrder(50); // Verursacht einen Fehler
// When - erste Ausführung schlägt fehl
JobExecution firstExecution = jobLauncherTestUtils.launchJob();
assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);
// Daten korrigieren
removePoisonOrder(50);
// When - Restart
JobExecution restartExecution = jobLauncherTestUtils.launchJob(
firstExecution.getJobParameters());
// Then - nimmt vom Fehlerpunkt wieder auf
assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
private void insertTestOrders(int count) {
for (int i = 1; i <= count; i++) {
jdbcTemplate.update(
"INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
i, i * 10, BigDecimal.valueOf(i * 10));
}
}
private int countProcessedOrders() {
return jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM processed_orders", Integer.class);
}
}Frage 15: Wie optimiert man die Datenbank-Schreibperformance?
Das Schreiben wird oft zum Engpass. Optimierungen sind: JDBC Batch Inserts, Deaktivierung von Constraints während des Ladens und Verwendung von Staging-Tabellen.
// Writer optimiert für hohe Volumina
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
private final DataSource dataSource;
public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
this.jdbcTemplate = jdbcTemplate;
this.dataSource = dataSource;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
List<? extends ProcessedOrder> items = chunk.getItems();
if (items.isEmpty()) {
return;
}
// Verwendet PreparedStatement mit Batch
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
"VALUES (?, ?, ?, ?)")) {
for (ProcessedOrder order : items) {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
ps.addBatch();
}
// Führt alle Inserts in einer einzigen Netzwerk-Operation aus
ps.executeBatch();
}
}
}
// StagingTableWriter.java
// Staging-Table-Pattern für sehr hohe Volumina
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {
private final JdbcTemplate jdbcTemplate;
private String stagingTable;
public StagingTableWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void beforeStep(StepExecution stepExecution) {
// Erstellt eine temporäre Tabelle für diesen Step
stagingTable = "staging_orders_" + stepExecution.getId();
jdbcTemplate.execute(
"CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// Schreibt in die Staging-Tabelle (ohne FK-Constraints)
String sql = "INSERT INTO " + stagingTable +
" (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";
jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
});
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
// Bulk Copy in die finale Tabelle
jdbcTemplate.execute(
"INSERT INTO processed_orders SELECT * FROM " + stagingTable);
}
// Säubert die Staging-Tabelle
jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
return stepExecution.getExitStatus();
}
}Fazit
Die Beherrschung von Spring Batch 5 in technischen Interviews beruht auf einem tiefen Verständnis der internen Mechanismen:
✅ Architektur: Job → Step → Chunk (Reader, Processor, Writer)
✅ Chunk-Verarbeitung: Dimensionierung, Lebenszyklus, Transaktionen
✅ Partitioning: lokal vs. remote, Partition-Balancing
✅ Fehlertoleranz: Skip, Retry, Restart mit passender Policy
✅ Skalierung: Multi-Threading, Parallel Steps, Remote Chunking
✅ Tests: Unit, Integration, End-to-End
✅ Optimierung: Batch Writes, Staging Tables, Monitoring
Fortgeschrittene Fragen prüfen die Fähigkeit, Architekturentscheidungen kontextabhängig zu rechtfertigen: Datenvolumen, Zeitvorgaben, Fehlertoleranz und verfügbare Infrastruktur.
Fang an zu üben!
Teste dein Wissen mit unseren Interview-Simulatoren und technischen Tests.
Tags
Teilen
Verwandte Artikel

Spring Modulith: Modulare Monolith-Architektur erklärt
Spring Modulith lernen, um modulare Monolithen in Java zu bauen. Architektur, Module, asynchrone Events und Tests mit Spring Boot 3.

Spring Boot Interview: Transaktions-Propagation erklärt
Beherrsche die Spring Boot Transaktions-Propagation: REQUIRED, REQUIRES_NEW, NESTED und mehr. 12 Interview-Fragen mit Code und typischen Fallstricken.

Spring Security 6: Vollständige JWT-Authentifizierung
Praxisleitfaden zur Implementierung der JWT-Authentifizierung mit Spring Security 6: Konfiguration, Token-Erzeugung, Validierung und Sicherheits-Best-Practices.