Spring Batch 5 Mülakat: Partitioning, Chunk ve Hata Toleransı
Spring Batch 5 mülakatlarında ustalaşın: partitioning, chunk işleme ve hata toleransı üzerine 15 temel soru, Java 21 kod örnekleriyle.

Spring Batch 5, Spring ekosisteminde kurumsal veri işlemenin temel taşıdır. Teknik mülakatlar; sağlam, ölçeklenebilir ve hataya dayanıklı işler tasarlama yetkinliğini değerlendirir. Partitioning, chunk yönelimli işleme ve hata toleransı mekanizmalarına hakim olmak kıdemli geliştiricileri öne çıkarır.
Mülakatçılar derin anlayışı sınar: neden remote chunking yerine partitioning seçilir? Chunk boyutu nasıl doğru belirlenir? Bu mimari kararlar gerçek üretim deneyimini ortaya çıkarır.
Spring Batch 5 temel mimarisi
Soru 1: Spring Batch'in ana bileşenleri nelerdir?
Spring Batch mimarisi üç katmandan oluşur: Application (işler ve iş kodu), Batch Core (işleri başlatmak ve kontrol etmek için runtime sınıfları) ve Infrastructure (RetryTemplate gibi ortak reader, writer ve servisler).
// Java 21 ile Spring Batch 5 iş yapılandırması
@Configuration
public class BatchJobConfig {
// JobRepository çalıştırma metadata'sını saklar
// Restart ve iş izlemeyi mümkün kılar
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public BatchJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Job tüm batch sürecini kapsüller
// Sıralı yürütülen bir veya daha fazla Step'ten oluşur
@Bean
public Job importUserJob(Step processUsersStep, Step cleanupStep) {
return new JobBuilder("importUserJob", jobRepository)
.start(processUsersStep) // Ana işleme step'i
.next(cleanupStep) // Temizlik step'i
.build();
}
// Step bağımsız bir iş birimini temsil eder
// İki model: Tasklet (tek görev) veya Chunk (yinelemeli işleme)
@Bean
public Step processUsersStep(ItemReader<UserRecord> reader,
ItemProcessor<UserRecord, User> processor,
ItemWriter<User> writer) {
return new StepBuilder("processUsersStep", jobRepository)
.<UserRecord, User>chunk(100, transactionManager) // Her 100 öğede commit
.reader(reader) // Kaynak veriyi okur
.processor(processor) // Her öğeyi dönüştürür
.writer(writer) // 100'lük partilerle yazar
.build();
}
}JobRepository çalıştırma durumunu veritabanında tutar. Bu kalıcılık, başarısız bir işin commit edilen verileri yeniden işlemeden tam durduğu yerden devam etmesini sağlar.
Soru 2: Tasklet ile Chunk yönelimli işleme arasındaki fark nedir?
Tasklet ayrık, yinelemeli olmayan bir eylem yürütür: dosya silme, stored procedure çağırma, bildirim e-postası gönderme. Chunk verileri yönetilebilir partilere bölerek büyük hacimleri işler.
// Tasklet: yineleme olmadan tek eylem
@Component
public class CleanupTasklet implements Tasklet {
private final Path tempDirectory = Path.of("/tmp/batch-work");
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// İşleme ait tüm geçici dosyaları siler
try (var files = Files.walk(tempDirectory)) {
files.filter(Files::isRegularFile)
.forEach(this::deleteQuietly);
}
// FINISHED tasklet'in işini tamamladığını gösterir
// CONTINUABLE çalıştırmayı yeniden başlatır (polling için yararlı)
return RepeatStatus.FINISHED;
}
private void deleteQuietly(Path file) {
try {
Files.delete(file);
} catch (IOException e) {
// Logla ve devam et - tek bir dosya için işi düşürme
}
}
}// Chunk işleme: yüksek hacimli işleme
@Configuration
public class ChunkProcessingConfig {
@Bean
public Step processOrdersStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("processOrdersStep", jobRepository)
// 500'lük chunk: 500 öğe okur, işler, yazar, commit eder
.<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// İlerlemeyi izlemek için listener
.listener(new ChunkProgressListener())
.build();
}
}Chunk yönelimli işleme kritik faydalar sağlar: optimize bellek yönetimi (yalnızca mevcut chunk bellekte), ayrıntılı işlemler (chunk başına commit) ve son commit edilen chunk'tan hata kurtarma.
Chunk yönelimli işlemenin derinlemesine incelenmesi
Soru 3: Chunk yaşam döngüsü nasıl çalışır?
Her chunk hassas bir döngü izler: yapılandırılmış boyuta ulaşana kadar öğeleri tek tek okuma, her öğeyi ayrı ayrı işleme, ardından grubu yazma. Bir işlem (transaction) tüm chunk'ı sarar.
// ItemReader: bir seferde bir öğe okur
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {
// @StepScope: her step çalıştırmasında yeni instance
// Dinamik iş parametrelerinin enjekte edilmesine olanak verir
@Value("#{jobParameters['startDate']}")
private LocalDate startDate;
private Iterator<OrderRecord> orderIterator;
@BeforeStep
public void initializeReader(StepExecution stepExecution) {
// Step başlangıcında veriyi yükler
List<OrderRecord> orders = fetchOrdersFromDate(startDate);
this.orderIterator = orders.iterator();
}
@Override
public OrderRecord read() {
// Veri sonunu işaretlemek için null döner
// Spring Batch null alana kadar read() çağırır
if (orderIterator.hasNext()) {
return orderIterator.next();
}
return null; // Veri kümesinin sonu
}
private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
// Veri kaynağından çeker
return List.of(); // Gerçek uygulama
}
}// ItemProcessor: her öğeyi ayrı ayrı dönüştürür
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {
private final PricingService pricingService;
private final ValidationService validationService;
public OrderItemProcessor(PricingService pricingService,
ValidationService validationService) {
this.pricingService = pricingService;
this.validationService = validationService;
}
@Override
public ProcessedOrder process(OrderRecord item) {
// null dönmek öğeyi filtreler (yazılmaz)
if (!validationService.isValid(item)) {
return null; // Öğe filtrelendi
}
// İş dönüşümü
BigDecimal finalPrice = pricingService.calculatePrice(item);
return new ProcessedOrder(
item.orderId(),
item.customerId(),
finalPrice,
LocalDateTime.now()
);
}
}// ItemWriter: tüm chunk'ı tek operasyonda yazar
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
public OrderItemWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// Chunk işlenmiş tüm öğeleri içerir
// Optimize performans için batch yazma
List<? extends ProcessedOrder> items = chunk.getItems();
jdbcTemplate.batchUpdate(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
items,
items.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
}
);
}
}Chunk işleme sırasında bir exception oluşursa transaction geri alınır. İş daha sonra JobRepository'de saklanan metadata'yı kullanarak o chunk'tan devam edebilir.
Soru 4: Optimal chunk boyutu nasıl seçilir?
Chunk boyutu performansı ve bellek tüketimini doğrudan etkiler. Çok küçük chunk commit'leri çoğaltır (overhead). Çok büyük chunk aşırı bellek tüketir ve hata durumunda rollback'i uzatır.
// Dinamik chunk boyut yapılandırması
@Configuration
public class ChunkSizingConfig {
// Çoğu durum için makul varsayılan
private static final int DEFAULT_CHUNK_SIZE = 100;
// Hafif öğeler için (az alan)
private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;
// Ağır öğeler için (blob, dokümanlar)
private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;
@Bean
public Step processLightDataStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<LightRecord> reader,
ItemWriter<LightRecord> writer) {
return new StepBuilder("processLightDataStep", jobRepository)
// Hafif öğeler: daha az commit için daha büyük chunk'lar
.<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Step processDocumentsStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<Document> reader,
ItemProcessor<Document, ProcessedDocument> processor,
ItemWriter<ProcessedDocument> writer) {
return new StepBuilder("processDocumentsStep", jobRepository)
// Ağır dokümanlar: belleği sınırlamak için daha küçük chunk'lar
.<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}Chunk başına 100 öğeyle başlayın, ardından metriklere göre ayarlayın: commit süresi, bellek kullanımı ve rollback süresi. En uygun değeri bulmak için listener kullanın.
Paralel işleme için Partitioning
Soru 5: Partitioning nedir ve ne zaman kullanılır?
Partitioning bir veri kümesini paralel işlenen bağımsız partition'lara böler. Her partition kendi thread'inde (yerel) veya uzak bir worker üzerinde çalışır. Bu yaklaşım restart yeteneğinden ödün vermeden iş hacmini katlar.
// Partition'lı iş yapılandırması
@Configuration
public class PartitionedJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public PartitionedJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
@Bean
public Job partitionedImportJob(Step partitionedStep) {
return new JobBuilder("partitionedImportJob", jobRepository)
.start(partitionedStep)
.build();
}
// Manager step: partition'ları orkestre eder
@Bean
public Step partitionedStep(Partitioner partitioner,
Step workerStep,
TaskExecutor taskExecutor) {
return new StepBuilder("partitionedStep", jobRepository)
// İşi Partitioner ile böler
.partitioner("workerStep", partitioner)
// Her partition için yürütülen step
.step(workerStep)
// 8 paralel thread
.taskExecutor(taskExecutor)
// Oluşturulacak partition sayısı
.gridSize(8)
.build();
}
// Paralel yürütme için TaskExecutor
@Bean
public TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("batch-partition-");
executor.initialize();
return executor;
}
}// ID aralıklarına dayalı Partitioner
@Component
public class RangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public RangePartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Veri kümesi sınırlarını alır
Long minId = jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
Long maxId = jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);
if (minId == null || maxId == null) {
return Map.of(); // İşlenecek veri yok
}
// Her partition'ın boyutunu hesaplar
long range = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
long start = minId + (i * range);
long end = Math.min(start + range - 1, maxId);
// Her partition kendi sınırlarını alır
context.putLong("minId", start);
context.putLong("maxId", end);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}Partitioning öğelerin bağımsız olduğu büyük veri kümeleri için uygundur. Yavaş bir partition'ın tüm işi yavaşlatmasını önlemek için partition'lar dengelenmelidir.
Soru 6: Yerel ve uzak partitioning arasındaki fark nedir?
Yerel partitioning tüm partition'ları aynı JVM'de bir thread pool ile çalıştırır. Uzak partitioning partition'ları mesajlaşma middleware'i aracılığıyla birden çok JVM (worker) arasında dağıtır.
// Mesajlaşma ile uzak partitioning yapılandırması
@Configuration
public class RemotePartitioningConfig {
@Bean
public Step managerStep(JobRepository jobRepository,
Partitioner partitioner,
MessageChannelPartitionHandler partitionHandler) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", partitioner)
// Uzak worker'larla iletişim kuran handler
.partitionHandler(partitionHandler)
.build();
}
// PartitionHandler ExecutionContext'leri worker'lara gönderir
@Bean
public MessageChannelPartitionHandler partitionHandler(
MessagingTemplate messagingTemplate,
JobExplorer jobExplorer) {
MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
handler.setStepName("workerStep");
handler.setGridSize(4);
handler.setMessagingOperations(messagingTemplate);
handler.setJobExplorer(jobExplorer);
// Worker'ların tamamlanmasını bekleme zaman aşımı
handler.setPollInterval(5000L);
return handler;
}
}// Worker tarafı yapılandırması
@Configuration
public class WorkerConfiguration {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public WorkerConfiguration(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Worker partition'ları alır ve step'i yürütür
@Bean
public Step workerStep(ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("workerStep", jobRepository)
.<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
// Partition parametrelerini almak için @StepScope ile reader
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
// Partition sınırlarını kullanan reader
@Bean
@StepScope
public JdbcCursorItemReader<OrderRecord> partitionedReader(
DataSource dataSource,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
return new JdbcCursorItemReaderBuilder<OrderRecord>()
.name("partitionedOrderReader")
.dataSource(dataSource)
.sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
.preparedStatementSetter(ps -> {
ps.setLong(1, minId);
ps.setLong(2, maxId);
})
.rowMapper(new OrderRecordRowMapper())
.build();
}
}Spring Boot mülakatlarında başarılı olmaya hazır mısın?
İnteraktif simülatörler, flashcards ve teknik testlerle pratik yap.
Hata toleransı ve hata kurtarma
Soru 7: Spring Batch hangi hata toleransı mekanizmalarını sunar?
Spring Batch üç tamamlayıcı mekanizma sunar: skip (başarısız öğeleri yoksay), retry (otomatik yeniden dene) ve restart (başarısız bir işi sürdür). Bu mekanizmalar step seviyesinde yapılandırılır.
// Tam hata toleransı yapılandırması
@Configuration
public class FaultTolerantStepConfig {
@Bean
public Step faultTolerantStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<DataRecord> reader,
ItemProcessor<DataRecord, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
SkipPolicy customSkipPolicy) {
return new StepBuilder("faultTolerantStep", jobRepository)
.<DataRecord, ProcessedRecord>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Hataya dayanıklı modu etkinleştirir
.faultTolerant()
// SKIP: 10'a kadar doğrulama hatasını yoksayar
.skipLimit(10)
.skip(ValidationException.class)
.skip(DataIntegrityViolationException.class)
// Bazı hatalar asla atlanmamalıdır
.noSkip(FatalBatchException.class)
// RETRY: geçici hataları yeniden dener
.retryLimit(3)
.retry(TransientDataAccessException.class)
.retry(DeadlockLoserDataAccessException.class)
// Yeniden denemeler arasında üstel backoff
.backOffPolicy(exponentialBackOffPolicy())
// Atlamaları loglamak için listener
.listener(skipListener())
.build();
}
@Bean
public BackOffPolicy exponentialBackOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(1000); // 1 saniye
policy.setMultiplier(2.0); // Her denemede ikiye katlanır
policy.setMaxInterval(10000); // Maksimum 10 saniye
return policy;
}
@Bean
public SkipListener<DataRecord, ProcessedRecord> skipListener() {
return new SkipListener<>() {
@Override
public void onSkipInRead(Throwable t) {
// Okunamayan öğeyi logla
}
@Override
public void onSkipInProcess(DataRecord item, Throwable t) {
// İşlemede başarısız olan öğeyi logla
}
@Override
public void onSkipInWrite(ProcessedRecord item, Throwable t) {
// Yazmada başarısız olan öğeyi logla
}
};
}
}Retry geçici hatalar için uygundur (ağ zaman aşımı, veritabanı deadlock'u). Skip genel işlemeyi engellememesi gereken bireysel veri hataları için uygundur.
Soru 8: Özel bir SkipPolicy nasıl uygulanır?
Özel SkipPolicy ince taneli karar mantığına izin verir: exception tipine, hata sayısına veya belirli iş kriterlerine göre atlama.
// Gelişmiş iş mantığına sahip SkipPolicy
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {
private static final int MAX_SKIP_COUNT = 100;
private static final double MAX_SKIP_PERCENTAGE = 0.05; // %5 maksimum
private final AtomicInteger totalProcessed = new AtomicInteger(0);
private final AtomicInteger skipCount = new AtomicInteger(0);
@Override
public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
// Kritik hataları asla atlama
if (exception instanceof FatalBatchException
|| exception instanceof OutOfMemoryError) {
return false;
}
// Mutlak skip limiti
if (skipCountSoFar >= MAX_SKIP_COUNT) {
return false; // İşi durdur
}
// Yüzde limiti
int total = totalProcessed.get();
if (total > 1000) { // Sadece ısınmadan sonra uygula
double skipPercentage = (double) skipCountSoFar / total;
if (skipPercentage > MAX_SKIP_PERCENTAGE) {
return false; // Oransal olarak çok fazla hata
}
}
// Doğrulama ve veri hatalarını atla
return exception instanceof ValidationException
|| exception instanceof DataFormatException
|| exception instanceof IllegalArgumentException;
}
// İlerlemeyi izlemek için listener tarafından çağrılır
public void incrementProcessed() {
totalProcessed.incrementAndGet();
}
}Soru 9: Başarısız bir işin restart'ı nasıl çalışır?
JobRepository her çalıştırmanın durumunu saklar. Restart'ta Spring Batch son commit edilen chunk'ı tanımlar ve o noktadan devam eder. Başarıyla işlenen öğeler yeniden işlenmez.
// İş restart yönetim servisi
@Service
public class JobRestartService {
private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
private final Job importJob;
public JobRestartService(JobLauncher jobLauncher,
JobExplorer jobExplorer,
JobRepository jobRepository,
@Qualifier("importJob") Job importJob) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
this.importJob = importJob;
}
public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
// Başarısız çalıştırmayı alır
JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);
if (failedExecution == null) {
throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
}
// İşin yeniden başlatılabileceğini doğrular
if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
throw new IllegalStateException("Only FAILED jobs can be restarted");
}
// Orijinal çalıştırmanın aynı parametrelerini kullanır
JobParameters originalParams = failedExecution.getJobParameters();
// İşi yeniden başlatır - son checkpoint'ten otomatik devam eder
return jobLauncher.run(importJob, originalParams);
}
public List<JobExecution> findRestartableJobs() {
// Henüz yeniden başlatılmayan tüm FAILED çalıştırmaları listeler
return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.FAILED)
.filter(this::isRestartable)
.toList();
}
private boolean isRestartable(JobExecution execution) {
// Daha yeni başarılı bir çalıştırma olmadığını doğrular
JobInstance instance = execution.getJobInstance();
return jobExplorer.getJobExecutions(instance).stream()
.noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
}
}Bir iş yalnızca JobParameters özdeşse yeniden başlatılabilir. Bir parametreyi değiştirmek yeni bir iş örneği oluşturur ve ilerleme geçmişi kaybolur.
Ölçekleme ve optimizasyon
Soru 10: Hangi ölçekleme stratejileri mevcut?
Spring Batch dört strateji sunar: multi-threaded step (birden fazla thread paralel okur), parallel steps (bağımsız step'ler paralel), remote chunking (dağıtık işleme) ve partitioning (dağıtık veri).
// Multi-threaded step: birden fazla thread aynı veri kümesini işler
@Configuration
public class MultiThreadedStepConfig {
@Bean
public Step multiThreadedStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<Record> reader,
ItemProcessor<Record, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
TaskExecutor taskExecutor) {
return new StepBuilder("multiThreadedStep", jobRepository)
.<Record, ProcessedRecord>chunk(100, transactionManager)
// DİKKAT: reader thread-safe olmalı
.reader(synchronizedReader(reader))
.processor(processor)
.writer(writer)
// 4 thread chunk'ları paralel işler
.taskExecutor(taskExecutor)
.throttleLimit(4)
.build();
}
// Reader'ı thread-safe yapmak için wrapper
private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
syncReader.setDelegate((ItemStreamReader<Record>) reader);
return syncReader;
}
}// Bağımsız step'leri paralel yürütme
@Configuration
public class ParallelStepsConfig {
@Bean
public Job parallelJob(JobRepository jobRepository,
Step loadCustomersStep,
Step loadProductsStep,
Step loadOrdersStep,
Step processDataStep) {
// Paralel akış: customers ve products eşzamanlı yüklenir
Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
.start(loadCustomersStep)
.build();
Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
.start(loadProductsStep)
.build();
Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
.start(loadOrdersStep)
.build();
// Split akışları paralel yürütür
return new JobBuilder("parallelJob", jobRepository)
.start(new FlowBuilder<Flow>("parallelLoadFlow")
.split(new SimpleAsyncTaskExecutor())
.add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
.build())
// Paralel yüklemeden sonra sıralı işleme
.next(processDataStep)
.build()
.build();
}
}Multi-threading reader'ın senkronize edilebildiği durumlara uygundur. Her partition'ın çekişmesiz kendi reader'ı olduğu için partitioning büyük hacimler için tercih edilir.
Soru 11: Bir işin performansı nasıl izlenir?
Spring Batch metrikleri listener'lar ve JobRepository üzerinden açar. Micrometer entegrasyonu Prometheus, Grafana veya diğer izleme sistemlerine dışa aktarmayı sağlar.
// Micrometer ile izleme yapılandırması
@Configuration
public class BatchMetricsConfig {
private final MeterRegistry meterRegistry;
public BatchMetricsConfig(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Bean
public JobExecutionListener metricsJobListener() {
return new JobExecutionListener() {
private Timer.Sample jobTimer;
@Override
public void beforeJob(JobExecution jobExecution) {
// İş süre timer'ını başlatır
jobTimer = Timer.start(meterRegistry);
Counter.builder("batch.job.started")
.tag("job", jobExecution.getJobInstance().getJobName())
.register(meterRegistry)
.increment();
}
@Override
public void afterJob(JobExecution jobExecution) {
// Toplam süreyi kaydeder
jobTimer.stop(Timer.builder("batch.job.duration")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry));
// Duruma göre iş sayacı
Counter.builder("batch.job.completed")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry)
.increment();
}
};
}
@Bean
public StepExecutionListener metricsStepListener() {
return new StepExecutionListener() {
@Override
public void afterStep(StepExecution stepExecution) {
String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
String stepName = stepExecution.getStepName();
// Throughput metrikleri
Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
return null;
}
};
}
}Soru 12: Partitioning'in yaygın tuzakları nelerdir?
Sık karşılaşılan hatalar şunlardır: dengesiz partition'lar (bir partition verinin %90'ını içerir), thread-safe olmayan reader'lar ve partition'lar arası yanlış durum yönetimi.
// Yükü gerçekten dengeleyen Partitioner
@Component
public class BalancedPartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// İşlenecek toplam öğe sayısını sayar
Integer totalCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);
if (totalCount == null || totalCount == 0) {
return Map.of();
}
// Partition başına hedef boyutu hesaplar
int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);
Map<String, ExecutionContext> partitions = new HashMap<>();
// Dengeli partition'lar için OFFSET/LIMIT kullanır
// Aralıklardan daha pahalı ama dengeyi garanti eder
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("offset", i * itemsPerPartition);
context.putInt("limit", itemsPerPartition);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}
// OffsetBasedReader.java
// Offset tabanlı partitioning ile uyumlu reader
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {
private final JdbcTemplate jdbcTemplate;
private Iterator<OrderRecord> iterator;
@Value("#{stepExecutionContext['offset']}")
private int offset;
@Value("#{stepExecutionContext['limit']}")
private int limit;
public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void open(ExecutionContext executionContext) {
// Bu partition'a atanan kısmı tam olarak yükler
List<OrderRecord> records = jdbcTemplate.query(
"SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
new OrderRecordRowMapper(),
limit, offset
);
this.iterator = records.iterator();
}
@Override
public OrderRecord read() {
return iterator.hasNext() ? iterator.next() : null;
}
@Override
public void update(ExecutionContext executionContext) {
// Gerekirse restart için durum kaydı
}
@Override
public void close() {
// Temizlik
}
}Kıdemliler için ileri düzey sorular
Soru 13: İşler arası bağımlılıklar nasıl yönetilir?
Spring Batch işler arası bağımlılıkları yerel olarak yönetmez. Çözümler şunları içerir: harici orkestratörler (Airflow, Kubernetes CronJob) veya JobExplorer ile özel uygulama.
// İşler arası bağımlılık yönetimi
@Service
public class JobDependencyService {
private final JobExplorer jobExplorer;
private final JobLauncher jobLauncher;
private final Map<String, Job> jobs;
public JobDependencyService(JobExplorer jobExplorer,
JobLauncher jobLauncher,
Map<String, Job> jobs) {
this.jobExplorer = jobExplorer;
this.jobLauncher = jobLauncher;
this.jobs = jobs;
}
public JobExecution runWithDependencies(String jobName,
JobParameters params,
List<String> dependsOn) throws Exception {
// Tüm bağımlılıkların başarılı olduğunu doğrular
for (String dependency : dependsOn) {
if (!hasSuccessfulExecution(dependency, params)) {
throw new JobExecutionException(
"Dependency not satisfied: " + dependency);
}
}
Job job = jobs.get(jobName);
if (job == null) {
throw new IllegalArgumentException("Unknown job: " + jobName);
}
return jobLauncher.run(job, params);
}
private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
// Aynı iş parametreleriyle COMPLETED bir çalıştırma arar
return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
.anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
}
private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
// İş parametrelerini karşılaştırır (yürütme zaman damgalarını yoksayar)
String actualDate = actual.getString("businessDate");
String expectedDate = expected.getString("businessDate");
return Objects.equals(actualDate, expectedDate);
}
}Soru 14: Bir Spring Batch işi nasıl etkili test edilir?
Spring Batch işlerini test etmek katmanlı bir yaklaşım gerektirir: bileşenlerin birim testleri (reader, processor, writer), step entegrasyon testleri ve tüm iş için uçtan uca testler.
// Processor birim testi
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {
@Mock
private PricingService pricingService;
@Mock
private ValidationService validationService;
@InjectMocks
private OrderItemProcessor processor;
@Test
void shouldProcessValidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(true);
when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));
// When
ProcessedOrder result = processor.process(input);
// Then
assertThat(result).isNotNull();
assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
}
@Test
void shouldFilterInvalidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(false);
// When
ProcessedOrder result = processor.process(input);
// Then - null filtrelendi anlamına gelir
assertThat(result).isNull();
verify(pricingService, never()).calculatePrice(any());
}
}// Tam iş entegrasyon testi
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JobRepositoryTestUtils jobRepositoryTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@BeforeEach
void setup() {
// Testler arası metadata'yı temizler
jobRepositoryTestUtils.removeJobExecutions();
// Test verilerini sıfırlar
jdbcTemplate.execute("DELETE FROM processed_orders");
jdbcTemplate.execute("DELETE FROM orders");
}
@Test
void shouldCompleteJobSuccessfully() throws Exception {
// Given - test verileri
insertTestOrders(100);
// When
JobParameters params = new JobParametersBuilder()
.addLocalDate("businessDate", LocalDate.now())
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncherTestUtils.launchJob(params);
// Then
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
assertThat(countProcessedOrders()).isEqualTo(100);
}
@Test
void shouldHandleEmptyDataset() throws Exception {
// Given - veri yok
// When
JobExecution execution = jobLauncherTestUtils.launchJob();
// Then - veri olmasa bile iş başarılı olur
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
@Test
void shouldRestartFromFailurePoint() throws Exception {
// Given - işlemenin ortasında hata simüle eder
insertTestOrders(100);
insertPoisonOrder(50); // Hataya neden olur
// When - ilk çalıştırma başarısız olur
JobExecution firstExecution = jobLauncherTestUtils.launchJob();
assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);
// Veriyi düzelt
removePoisonOrder(50);
// When - restart
JobExecution restartExecution = jobLauncherTestUtils.launchJob(
firstExecution.getJobParameters());
// Then - hata noktasından devam eder
assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
private void insertTestOrders(int count) {
for (int i = 1; i <= count; i++) {
jdbcTemplate.update(
"INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
i, i * 10, BigDecimal.valueOf(i * 10));
}
}
private int countProcessedOrders() {
return jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM processed_orders", Integer.class);
}
}Soru 15: Veritabanı yazma performansı nasıl optimize edilir?
Yazma genellikle darboğaz haline gelir. Optimizasyonlar şunları içerir: JDBC batch insert'ler, yükleme sırasında constraint'lerin devre dışı bırakılması ve staging tabloların kullanımı.
// Yüksek hacimler için optimize edilmiş writer
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
private final DataSource dataSource;
public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
this.jdbcTemplate = jdbcTemplate;
this.dataSource = dataSource;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
List<? extends ProcessedOrder> items = chunk.getItems();
if (items.isEmpty()) {
return;
}
// Batch ile PreparedStatement kullanır
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
"VALUES (?, ?, ?, ?)")) {
for (ProcessedOrder order : items) {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
ps.addBatch();
}
// Tüm insert'leri tek ağ operasyonunda yürütür
ps.executeBatch();
}
}
}
// StagingTableWriter.java
// Çok büyük hacimler için staging table deseni
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {
private final JdbcTemplate jdbcTemplate;
private String stagingTable;
public StagingTableWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void beforeStep(StepExecution stepExecution) {
// Bu step için geçici tablo oluşturur
stagingTable = "staging_orders_" + stepExecution.getId();
jdbcTemplate.execute(
"CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// Staging tabloya yazar (FK constraint olmadan)
String sql = "INSERT INTO " + stagingTable +
" (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";
jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
});
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
// Hedef tabloya bulk copy
jdbcTemplate.execute(
"INSERT INTO processed_orders SELECT * FROM " + stagingTable);
}
// Staging tabloyu temizler
jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
return stepExecution.getExitStatus();
}
}Sonuç
Teknik mülakatlarda Spring Batch 5'e hakim olmak iç mekanizmaların derin anlaşılmasına dayanır:
✅ Mimari: Job → Step → Chunk (Reader, Processor, Writer)
✅ Chunk işleme: boyutlandırma, yaşam döngüsü, transaction'lar
✅ Partitioning: yerel vs uzak, partition dengeleme
✅ Hata toleransı: skip, retry, restart uygun politikalarla
✅ Ölçekleme: multi-threading, parallel steps, remote chunking
✅ Test: birim, entegrasyon, uçtan uca
✅ Optimizasyon: batch yazma, staging tablolar, izleme
İleri düzey sorular bağlama göre mimari kararları gerekçelendirme yetkinliğini değerlendirir: veri hacmi, zaman kısıtları, hata toleransı ve mevcut altyapı.
Pratik yapmaya başla!
Mülakat simülatörleri ve teknik testlerle bilgini test et.
Etiketler
Paylaş
İlgili makaleler

Spring Modulith: Modüler Monolit Mimarisi Açıklaması
Java'da modüler monolitler oluşturmak için Spring Modulith öğrenin. Mimari, modüller, asenkron eventler ve Spring Boot 3 örnekleriyle test.

Spring Boot Mülakatı: İşlem Yayılımı Açıklandı
Spring Boot işlem yayılımına hakim olun: REQUIRED, REQUIRES_NEW, NESTED ve daha fazlası. Kod örnekleri ve yaygın tuzaklarla 12 mülakat sorusu.

Spring Security 6: Eksiksiz JWT Kimlik Doğrulaması
Spring Security 6 ile JWT kimlik doğrulamasını uygulamak için pratik rehber: yapılandırma, token üretimi, doğrulama ve güvenlik için en iyi uygulamalar.