Spring Batch 5 Mülakat: Partitioning, Chunk ve Hata Toleransı

Spring Batch 5 mülakatlarında ustalaşın: partitioning, chunk işleme ve hata toleransı üzerine 15 temel soru, Java 21 kod örnekleriyle.

Spring Batch 5 Mülakat: partitioning, chunk ve hata toleransı

Spring Batch 5, Spring ekosisteminde kurumsal veri işlemenin temel taşıdır. Teknik mülakatlar; sağlam, ölçeklenebilir ve hataya dayanıklı işler tasarlama yetkinliğini değerlendirir. Partitioning, chunk yönelimli işleme ve hata toleransı mekanizmalarına hakim olmak kıdemli geliştiricileri öne çıkarır.

Mülakat odağı

Mülakatçılar derin anlayışı sınar: neden remote chunking yerine partitioning seçilir? Chunk boyutu nasıl doğru belirlenir? Bu mimari kararlar gerçek üretim deneyimini ortaya çıkarır.

Spring Batch 5 temel mimarisi

Soru 1: Spring Batch'in ana bileşenleri nelerdir?

Spring Batch mimarisi üç katmandan oluşur: Application (işler ve iş kodu), Batch Core (işleri başlatmak ve kontrol etmek için runtime sınıfları) ve Infrastructure (RetryTemplate gibi ortak reader, writer ve servisler).

BatchJobConfig.javajava
// Java 21 ile Spring Batch 5 iş yapılandırması
@Configuration
public class BatchJobConfig {

    // JobRepository çalıştırma metadata'sını saklar
    // Restart ve iş izlemeyi mümkün kılar
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public BatchJobConfig(JobRepository jobRepository,
                          PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    // Job tüm batch sürecini kapsüller
    // Sıralı yürütülen bir veya daha fazla Step'ten oluşur
    @Bean
    public Job importUserJob(Step processUsersStep, Step cleanupStep) {
        return new JobBuilder("importUserJob", jobRepository)
                .start(processUsersStep)      // Ana işleme step'i
                .next(cleanupStep)             // Temizlik step'i
                .build();
    }

    // Step bağımsız bir iş birimini temsil eder
    // İki model: Tasklet (tek görev) veya Chunk (yinelemeli işleme)
    @Bean
    public Step processUsersStep(ItemReader<UserRecord> reader,
                                  ItemProcessor<UserRecord, User> processor,
                                  ItemWriter<User> writer) {
        return new StepBuilder("processUsersStep", jobRepository)
                .<UserRecord, User>chunk(100, transactionManager)  // Her 100 öğede commit
                .reader(reader)       // Kaynak veriyi okur
                .processor(processor) // Her öğeyi dönüştürür
                .writer(writer)       // 100'lük partilerle yazar
                .build();
    }
}

JobRepository çalıştırma durumunu veritabanında tutar. Bu kalıcılık, başarısız bir işin commit edilen verileri yeniden işlemeden tam durduğu yerden devam etmesini sağlar.

Soru 2: Tasklet ile Chunk yönelimli işleme arasındaki fark nedir?

Tasklet ayrık, yinelemeli olmayan bir eylem yürütür: dosya silme, stored procedure çağırma, bildirim e-postası gönderme. Chunk verileri yönetilebilir partilere bölerek büyük hacimleri işler.

CleanupTasklet.javajava
// Tasklet: yineleme olmadan tek eylem
@Component
public class CleanupTasklet implements Tasklet {

    private final Path tempDirectory = Path.of("/tmp/batch-work");

    @Override
    public RepeatStatus execute(StepContribution contribution,
                                 ChunkContext chunkContext) throws Exception {
        // İşleme ait tüm geçici dosyaları siler
        try (var files = Files.walk(tempDirectory)) {
            files.filter(Files::isRegularFile)
                 .forEach(this::deleteQuietly);
        }

        // FINISHED tasklet'in işini tamamladığını gösterir
        // CONTINUABLE çalıştırmayı yeniden başlatır (polling için yararlı)
        return RepeatStatus.FINISHED;
    }

    private void deleteQuietly(Path file) {
        try {
            Files.delete(file);
        } catch (IOException e) {
            // Logla ve devam et - tek bir dosya için işi düşürme
        }
    }
}
ChunkProcessingConfig.javajava
// Chunk işleme: yüksek hacimli işleme
@Configuration
public class ChunkProcessingConfig {

    @Bean
    public Step processOrdersStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<OrderRecord> reader,
                                   ItemProcessor<OrderRecord, ProcessedOrder> processor,
                                   ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("processOrdersStep", jobRepository)
                // 500'lük chunk: 500 öğe okur, işler, yazar, commit eder
                .<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                // İlerlemeyi izlemek için listener
                .listener(new ChunkProgressListener())
                .build();
    }
}

Chunk yönelimli işleme kritik faydalar sağlar: optimize bellek yönetimi (yalnızca mevcut chunk bellekte), ayrıntılı işlemler (chunk başına commit) ve son commit edilen chunk'tan hata kurtarma.

Chunk yönelimli işlemenin derinlemesine incelenmesi

Soru 3: Chunk yaşam döngüsü nasıl çalışır?

Her chunk hassas bir döngü izler: yapılandırılmış boyuta ulaşana kadar öğeleri tek tek okuma, her öğeyi ayrı ayrı işleme, ardından grubu yazma. Bir işlem (transaction) tüm chunk'ı sarar.

OrderItemReader.javajava
// ItemReader: bir seferde bir öğe okur
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {

    // @StepScope: her step çalıştırmasında yeni instance
    // Dinamik iş parametrelerinin enjekte edilmesine olanak verir
    @Value("#{jobParameters['startDate']}")
    private LocalDate startDate;

    private Iterator<OrderRecord> orderIterator;

    @BeforeStep
    public void initializeReader(StepExecution stepExecution) {
        // Step başlangıcında veriyi yükler
        List<OrderRecord> orders = fetchOrdersFromDate(startDate);
        this.orderIterator = orders.iterator();
    }

    @Override
    public OrderRecord read() {
        // Veri sonunu işaretlemek için null döner
        // Spring Batch null alana kadar read() çağırır
        if (orderIterator.hasNext()) {
            return orderIterator.next();
        }
        return null;  // Veri kümesinin sonu
    }

    private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
        // Veri kaynağından çeker
        return List.of();  // Gerçek uygulama
    }
}
OrderItemProcessor.javajava
// ItemProcessor: her öğeyi ayrı ayrı dönüştürür
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {

    private final PricingService pricingService;
    private final ValidationService validationService;

    public OrderItemProcessor(PricingService pricingService,
                               ValidationService validationService) {
        this.pricingService = pricingService;
        this.validationService = validationService;
    }

    @Override
    public ProcessedOrder process(OrderRecord item) {
        // null dönmek öğeyi filtreler (yazılmaz)
        if (!validationService.isValid(item)) {
            return null;  // Öğe filtrelendi
        }

        // İş dönüşümü
        BigDecimal finalPrice = pricingService.calculatePrice(item);

        return new ProcessedOrder(
                item.orderId(),
                item.customerId(),
                finalPrice,
                LocalDateTime.now()
        );
    }
}
OrderItemWriter.javajava
// ItemWriter: tüm chunk'ı tek operasyonda yazar
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;

    public OrderItemWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) {
        // Chunk işlenmiş tüm öğeleri içerir
        // Optimize performans için batch yazma
        List<? extends ProcessedOrder> items = chunk.getItems();

        jdbcTemplate.batchUpdate(
                "INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
                items,
                items.size(),
                (ps, order) -> {
                    ps.setLong(1, order.orderId());
                    ps.setLong(2, order.customerId());
                    ps.setBigDecimal(3, order.finalPrice());
                    ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                }
        );
    }
}

Chunk işleme sırasında bir exception oluşursa transaction geri alınır. İş daha sonra JobRepository'de saklanan metadata'yı kullanarak o chunk'tan devam edebilir.

Soru 4: Optimal chunk boyutu nasıl seçilir?

Chunk boyutu performansı ve bellek tüketimini doğrudan etkiler. Çok küçük chunk commit'leri çoğaltır (overhead). Çok büyük chunk aşırı bellek tüketir ve hata durumunda rollback'i uzatır.

ChunkSizingConfig.javajava
// Dinamik chunk boyut yapılandırması
@Configuration
public class ChunkSizingConfig {

    // Çoğu durum için makul varsayılan
    private static final int DEFAULT_CHUNK_SIZE = 100;

    // Hafif öğeler için (az alan)
    private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;

    // Ağır öğeler için (blob, dokümanlar)
    private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;

    @Bean
    public Step processLightDataStep(JobRepository jobRepository,
                                      PlatformTransactionManager txManager,
                                      ItemReader<LightRecord> reader,
                                      ItemWriter<LightRecord> writer) {
        return new StepBuilder("processLightDataStep", jobRepository)
                // Hafif öğeler: daha az commit için daha büyük chunk'lar
                .<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
                .reader(reader)
                .writer(writer)
                .build();
    }

    @Bean
    public Step processDocumentsStep(JobRepository jobRepository,
                                      PlatformTransactionManager txManager,
                                      ItemReader<Document> reader,
                                      ItemProcessor<Document, ProcessedDocument> processor,
                                      ItemWriter<ProcessedDocument> writer) {
        return new StepBuilder("processDocumentsStep", jobRepository)
                // Ağır dokümanlar: belleği sınırlamak için daha küçük chunk'lar
                .<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}
Pratik kural

Chunk başına 100 öğeyle başlayın, ardından metriklere göre ayarlayın: commit süresi, bellek kullanımı ve rollback süresi. En uygun değeri bulmak için listener kullanın.

Paralel işleme için Partitioning

Soru 5: Partitioning nedir ve ne zaman kullanılır?

Partitioning bir veri kümesini paralel işlenen bağımsız partition'lara böler. Her partition kendi thread'inde (yerel) veya uzak bir worker üzerinde çalışır. Bu yaklaşım restart yeteneğinden ödün vermeden iş hacmini katlar.

PartitionedJobConfig.javajava
// Partition'lı iş yapılandırması
@Configuration
public class PartitionedJobConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public PartitionedJobConfig(JobRepository jobRepository,
                                 PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    @Bean
    public Job partitionedImportJob(Step partitionedStep) {
        return new JobBuilder("partitionedImportJob", jobRepository)
                .start(partitionedStep)
                .build();
    }

    // Manager step: partition'ları orkestre eder
    @Bean
    public Step partitionedStep(Partitioner partitioner,
                                 Step workerStep,
                                 TaskExecutor taskExecutor) {
        return new StepBuilder("partitionedStep", jobRepository)
                // İşi Partitioner ile böler
                .partitioner("workerStep", partitioner)
                // Her partition için yürütülen step
                .step(workerStep)
                // 8 paralel thread
                .taskExecutor(taskExecutor)
                // Oluşturulacak partition sayısı
                .gridSize(8)
                .build();
    }

    // Paralel yürütme için TaskExecutor
    @Bean
    public TaskExecutor batchTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(16);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("batch-partition-");
        executor.initialize();
        return executor;
    }
}
RangePartitioner.javajava
// ID aralıklarına dayalı Partitioner
@Component
public class RangePartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public RangePartitioner(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // Veri kümesi sınırlarını alır
        Long minId = jdbcTemplate.queryForObject(
                "SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
        Long maxId = jdbcTemplate.queryForObject(
                "SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);

        if (minId == null || maxId == null) {
            return Map.of();  // İşlenecek veri yok
        }

        // Her partition'ın boyutunu hesaplar
        long range = (maxId - minId) / gridSize + 1;
        Map<String, ExecutionContext> partitions = new HashMap<>();

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            long start = minId + (i * range);
            long end = Math.min(start + range - 1, maxId);

            // Her partition kendi sınırlarını alır
            context.putLong("minId", start);
            context.putLong("maxId", end);
            context.putInt("partitionNumber", i);

            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

Partitioning öğelerin bağımsız olduğu büyük veri kümeleri için uygundur. Yavaş bir partition'ın tüm işi yavaşlatmasını önlemek için partition'lar dengelenmelidir.

Soru 6: Yerel ve uzak partitioning arasındaki fark nedir?

Yerel partitioning tüm partition'ları aynı JVM'de bir thread pool ile çalıştırır. Uzak partitioning partition'ları mesajlaşma middleware'i aracılığıyla birden çok JVM (worker) arasında dağıtır.

RemotePartitioningConfig.javajava
// Mesajlaşma ile uzak partitioning yapılandırması
@Configuration
public class RemotePartitioningConfig {

    @Bean
    public Step managerStep(JobRepository jobRepository,
                             Partitioner partitioner,
                             MessageChannelPartitionHandler partitionHandler) {
        return new StepBuilder("managerStep", jobRepository)
                .partitioner("workerStep", partitioner)
                // Uzak worker'larla iletişim kuran handler
                .partitionHandler(partitionHandler)
                .build();
    }

    // PartitionHandler ExecutionContext'leri worker'lara gönderir
    @Bean
    public MessageChannelPartitionHandler partitionHandler(
            MessagingTemplate messagingTemplate,
            JobExplorer jobExplorer) {
        MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
        handler.setStepName("workerStep");
        handler.setGridSize(4);
        handler.setMessagingOperations(messagingTemplate);
        handler.setJobExplorer(jobExplorer);
        // Worker'ların tamamlanmasını bekleme zaman aşımı
        handler.setPollInterval(5000L);
        return handler;
    }
}
WorkerConfiguration.javajava
// Worker tarafı yapılandırması
@Configuration
public class WorkerConfiguration {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public WorkerConfiguration(JobRepository jobRepository,
                                PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    // Worker partition'ları alır ve step'i yürütür
    @Bean
    public Step workerStep(ItemReader<OrderRecord> reader,
                            ItemProcessor<OrderRecord, ProcessedOrder> processor,
                            ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("workerStep", jobRepository)
                .<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
                // Partition parametrelerini almak için @StepScope ile reader
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    // Partition sınırlarını kullanan reader
    @Bean
    @StepScope
    public JdbcCursorItemReader<OrderRecord> partitionedReader(
            DataSource dataSource,
            @Value("#{stepExecutionContext['minId']}") Long minId,
            @Value("#{stepExecutionContext['maxId']}") Long maxId) {
        return new JdbcCursorItemReaderBuilder<OrderRecord>()
                .name("partitionedOrderReader")
                .dataSource(dataSource)
                .sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
                .preparedStatementSetter(ps -> {
                    ps.setLong(1, minId);
                    ps.setLong(2, maxId);
                })
                .rowMapper(new OrderRecordRowMapper())
                .build();
    }
}

Spring Boot mülakatlarında başarılı olmaya hazır mısın?

İnteraktif simülatörler, flashcards ve teknik testlerle pratik yap.

Hata toleransı ve hata kurtarma

Soru 7: Spring Batch hangi hata toleransı mekanizmalarını sunar?

Spring Batch üç tamamlayıcı mekanizma sunar: skip (başarısız öğeleri yoksay), retry (otomatik yeniden dene) ve restart (başarısız bir işi sürdür). Bu mekanizmalar step seviyesinde yapılandırılır.

FaultTolerantStepConfig.javajava
// Tam hata toleransı yapılandırması
@Configuration
public class FaultTolerantStepConfig {

    @Bean
    public Step faultTolerantStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<DataRecord> reader,
                                   ItemProcessor<DataRecord, ProcessedRecord> processor,
                                   ItemWriter<ProcessedRecord> writer,
                                   SkipPolicy customSkipPolicy) {
        return new StepBuilder("faultTolerantStep", jobRepository)
                .<DataRecord, ProcessedRecord>chunk(100, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                // Hataya dayanıklı modu etkinleştirir
                .faultTolerant()
                // SKIP: 10'a kadar doğrulama hatasını yoksayar
                .skipLimit(10)
                .skip(ValidationException.class)
                .skip(DataIntegrityViolationException.class)
                // Bazı hatalar asla atlanmamalıdır
                .noSkip(FatalBatchException.class)
                // RETRY: geçici hataları yeniden dener
                .retryLimit(3)
                .retry(TransientDataAccessException.class)
                .retry(DeadlockLoserDataAccessException.class)
                // Yeniden denemeler arasında üstel backoff
                .backOffPolicy(exponentialBackOffPolicy())
                // Atlamaları loglamak için listener
                .listener(skipListener())
                .build();
    }

    @Bean
    public BackOffPolicy exponentialBackOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(1000);  // 1 saniye
        policy.setMultiplier(2.0);         // Her denemede ikiye katlanır
        policy.setMaxInterval(10000);      // Maksimum 10 saniye
        return policy;
    }

    @Bean
    public SkipListener<DataRecord, ProcessedRecord> skipListener() {
        return new SkipListener<>() {
            @Override
            public void onSkipInRead(Throwable t) {
                // Okunamayan öğeyi logla
            }

            @Override
            public void onSkipInProcess(DataRecord item, Throwable t) {
                // İşlemede başarısız olan öğeyi logla
            }

            @Override
            public void onSkipInWrite(ProcessedRecord item, Throwable t) {
                // Yazmada başarısız olan öğeyi logla
            }
        };
    }
}

Retry geçici hatalar için uygundur (ağ zaman aşımı, veritabanı deadlock'u). Skip genel işlemeyi engellememesi gereken bireysel veri hataları için uygundur.

Soru 8: Özel bir SkipPolicy nasıl uygulanır?

Özel SkipPolicy ince taneli karar mantığına izin verir: exception tipine, hata sayısına veya belirli iş kriterlerine göre atlama.

AdaptiveSkipPolicy.javajava
// Gelişmiş iş mantığına sahip SkipPolicy
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {

    private static final int MAX_SKIP_COUNT = 100;
    private static final double MAX_SKIP_PERCENTAGE = 0.05;  // %5 maksimum

    private final AtomicInteger totalProcessed = new AtomicInteger(0);
    private final AtomicInteger skipCount = new AtomicInteger(0);

    @Override
    public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
        // Kritik hataları asla atlama
        if (exception instanceof FatalBatchException
                || exception instanceof OutOfMemoryError) {
            return false;
        }

        // Mutlak skip limiti
        if (skipCountSoFar >= MAX_SKIP_COUNT) {
            return false;  // İşi durdur
        }

        // Yüzde limiti
        int total = totalProcessed.get();
        if (total > 1000) {  // Sadece ısınmadan sonra uygula
            double skipPercentage = (double) skipCountSoFar / total;
            if (skipPercentage > MAX_SKIP_PERCENTAGE) {
                return false;  // Oransal olarak çok fazla hata
            }
        }

        // Doğrulama ve veri hatalarını atla
        return exception instanceof ValidationException
                || exception instanceof DataFormatException
                || exception instanceof IllegalArgumentException;
    }

    // İlerlemeyi izlemek için listener tarafından çağrılır
    public void incrementProcessed() {
        totalProcessed.incrementAndGet();
    }
}

Soru 9: Başarısız bir işin restart'ı nasıl çalışır?

JobRepository her çalıştırmanın durumunu saklar. Restart'ta Spring Batch son commit edilen chunk'ı tanımlar ve o noktadan devam eder. Başarıyla işlenen öğeler yeniden işlenmez.

JobRestartService.javajava
// İş restart yönetim servisi
@Service
public class JobRestartService {

    private final JobLauncher jobLauncher;
    private final JobExplorer jobExplorer;
    private final JobRepository jobRepository;
    private final Job importJob;

    public JobRestartService(JobLauncher jobLauncher,
                              JobExplorer jobExplorer,
                              JobRepository jobRepository,
                              @Qualifier("importJob") Job importJob) {
        this.jobLauncher = jobLauncher;
        this.jobExplorer = jobExplorer;
        this.jobRepository = jobRepository;
        this.importJob = importJob;
    }

    public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
        // Başarısız çalıştırmayı alır
        JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);

        if (failedExecution == null) {
            throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
        }

        // İşin yeniden başlatılabileceğini doğrular
        if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
            throw new IllegalStateException("Only FAILED jobs can be restarted");
        }

        // Orijinal çalıştırmanın aynı parametrelerini kullanır
        JobParameters originalParams = failedExecution.getJobParameters();

        // İşi yeniden başlatır - son checkpoint'ten otomatik devam eder
        return jobLauncher.run(importJob, originalParams);
    }

    public List<JobExecution> findRestartableJobs() {
        // Henüz yeniden başlatılmayan tüm FAILED çalıştırmaları listeler
        return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
                .stream()
                .flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
                .filter(exec -> exec.getStatus() == BatchStatus.FAILED)
                .filter(this::isRestartable)
                .toList();
    }

    private boolean isRestartable(JobExecution execution) {
        // Daha yeni başarılı bir çalıştırma olmadığını doğrular
        JobInstance instance = execution.getJobInstance();
        return jobExplorer.getJobExecutions(instance).stream()
                .noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
    }
}
Mülakat tuzağı

Bir iş yalnızca JobParameters özdeşse yeniden başlatılabilir. Bir parametreyi değiştirmek yeni bir iş örneği oluşturur ve ilerleme geçmişi kaybolur.

Ölçekleme ve optimizasyon

Soru 10: Hangi ölçekleme stratejileri mevcut?

Spring Batch dört strateji sunar: multi-threaded step (birden fazla thread paralel okur), parallel steps (bağımsız step'ler paralel), remote chunking (dağıtık işleme) ve partitioning (dağıtık veri).

MultiThreadedStepConfig.javajava
// Multi-threaded step: birden fazla thread aynı veri kümesini işler
@Configuration
public class MultiThreadedStepConfig {

    @Bean
    public Step multiThreadedStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<Record> reader,
                                   ItemProcessor<Record, ProcessedRecord> processor,
                                   ItemWriter<ProcessedRecord> writer,
                                   TaskExecutor taskExecutor) {
        return new StepBuilder("multiThreadedStep", jobRepository)
                .<Record, ProcessedRecord>chunk(100, transactionManager)
                // DİKKAT: reader thread-safe olmalı
                .reader(synchronizedReader(reader))
                .processor(processor)
                .writer(writer)
                // 4 thread chunk'ları paralel işler
                .taskExecutor(taskExecutor)
                .throttleLimit(4)
                .build();
    }

    // Reader'ı thread-safe yapmak için wrapper
    private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
        SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
        syncReader.setDelegate((ItemStreamReader<Record>) reader);
        return syncReader;
    }
}
ParallelStepsConfig.javajava
// Bağımsız step'leri paralel yürütme
@Configuration
public class ParallelStepsConfig {

    @Bean
    public Job parallelJob(JobRepository jobRepository,
                            Step loadCustomersStep,
                            Step loadProductsStep,
                            Step loadOrdersStep,
                            Step processDataStep) {
        // Paralel akış: customers ve products eşzamanlı yüklenir
        Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
                .start(loadCustomersStep)
                .build();

        Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
                .start(loadProductsStep)
                .build();

        Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
                .start(loadOrdersStep)
                .build();

        // Split akışları paralel yürütür
        return new JobBuilder("parallelJob", jobRepository)
                .start(new FlowBuilder<Flow>("parallelLoadFlow")
                        .split(new SimpleAsyncTaskExecutor())
                        .add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
                        .build())
                // Paralel yüklemeden sonra sıralı işleme
                .next(processDataStep)
                .build()
                .build();
    }
}

Multi-threading reader'ın senkronize edilebildiği durumlara uygundur. Her partition'ın çekişmesiz kendi reader'ı olduğu için partitioning büyük hacimler için tercih edilir.

Soru 11: Bir işin performansı nasıl izlenir?

Spring Batch metrikleri listener'lar ve JobRepository üzerinden açar. Micrometer entegrasyonu Prometheus, Grafana veya diğer izleme sistemlerine dışa aktarmayı sağlar.

BatchMetricsConfig.javajava
// Micrometer ile izleme yapılandırması
@Configuration
public class BatchMetricsConfig {

    private final MeterRegistry meterRegistry;

    public BatchMetricsConfig(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }

    @Bean
    public JobExecutionListener metricsJobListener() {
        return new JobExecutionListener() {

            private Timer.Sample jobTimer;

            @Override
            public void beforeJob(JobExecution jobExecution) {
                // İş süre timer'ını başlatır
                jobTimer = Timer.start(meterRegistry);
                Counter.builder("batch.job.started")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .register(meterRegistry)
                        .increment();
            }

            @Override
            public void afterJob(JobExecution jobExecution) {
                // Toplam süreyi kaydeder
                jobTimer.stop(Timer.builder("batch.job.duration")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .tag("status", jobExecution.getStatus().toString())
                        .register(meterRegistry));

                // Duruma göre iş sayacı
                Counter.builder("batch.job.completed")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .tag("status", jobExecution.getStatus().toString())
                        .register(meterRegistry)
                        .increment();
            }
        };
    }

    @Bean
    public StepExecutionListener metricsStepListener() {
        return new StepExecutionListener() {

            @Override
            public void afterStep(StepExecution stepExecution) {
                String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
                String stepName = stepExecution.getStepName();

                // Throughput metrikleri
                Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                return null;
            }
        };
    }
}

Soru 12: Partitioning'in yaygın tuzakları nelerdir?

Sık karşılaşılan hatalar şunlardır: dengesiz partition'lar (bir partition verinin %90'ını içerir), thread-safe olmayan reader'lar ve partition'lar arası yanlış durum yönetimi.

BalancedPartitioner.javajava
// Yükü gerçekten dengeleyen Partitioner
@Component
public class BalancedPartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // İşlenecek toplam öğe sayısını sayar
        Integer totalCount = jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);

        if (totalCount == null || totalCount == 0) {
            return Map.of();
        }

        // Partition başına hedef boyutu hesaplar
        int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);

        Map<String, ExecutionContext> partitions = new HashMap<>();

        // Dengeli partition'lar için OFFSET/LIMIT kullanır
        // Aralıklardan daha pahalı ama dengeyi garanti eder
        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.putInt("offset", i * itemsPerPartition);
            context.putInt("limit", itemsPerPartition);
            context.putInt("partitionNumber", i);

            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

// OffsetBasedReader.java
// Offset tabanlı partitioning ile uyumlu reader
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {

    private final JdbcTemplate jdbcTemplate;
    private Iterator<OrderRecord> iterator;

    @Value("#{stepExecutionContext['offset']}")
    private int offset;

    @Value("#{stepExecutionContext['limit']}")
    private int limit;

    public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void open(ExecutionContext executionContext) {
        // Bu partition'a atanan kısmı tam olarak yükler
        List<OrderRecord> records = jdbcTemplate.query(
                "SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
                new OrderRecordRowMapper(),
                limit, offset
        );
        this.iterator = records.iterator();
    }

    @Override
    public OrderRecord read() {
        return iterator.hasNext() ? iterator.next() : null;
    }

    @Override
    public void update(ExecutionContext executionContext) {
        // Gerekirse restart için durum kaydı
    }

    @Override
    public void close() {
        // Temizlik
    }
}

Kıdemliler için ileri düzey sorular

Soru 13: İşler arası bağımlılıklar nasıl yönetilir?

Spring Batch işler arası bağımlılıkları yerel olarak yönetmez. Çözümler şunları içerir: harici orkestratörler (Airflow, Kubernetes CronJob) veya JobExplorer ile özel uygulama.

JobDependencyService.javajava
// İşler arası bağımlılık yönetimi
@Service
public class JobDependencyService {

    private final JobExplorer jobExplorer;
    private final JobLauncher jobLauncher;
    private final Map<String, Job> jobs;

    public JobDependencyService(JobExplorer jobExplorer,
                                  JobLauncher jobLauncher,
                                  Map<String, Job> jobs) {
        this.jobExplorer = jobExplorer;
        this.jobLauncher = jobLauncher;
        this.jobs = jobs;
    }

    public JobExecution runWithDependencies(String jobName,
                                             JobParameters params,
                                             List<String> dependsOn) throws Exception {
        // Tüm bağımlılıkların başarılı olduğunu doğrular
        for (String dependency : dependsOn) {
            if (!hasSuccessfulExecution(dependency, params)) {
                throw new JobExecutionException(
                        "Dependency not satisfied: " + dependency);
            }
        }

        Job job = jobs.get(jobName);
        if (job == null) {
            throw new IllegalArgumentException("Unknown job: " + jobName);
        }

        return jobLauncher.run(job, params);
    }

    private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
        // Aynı iş parametreleriyle COMPLETED bir çalıştırma arar
        return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
                .stream()
                .flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
                .filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
                .anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
    }

    private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
        // İş parametrelerini karşılaştırır (yürütme zaman damgalarını yoksayar)
        String actualDate = actual.getString("businessDate");
        String expectedDate = expected.getString("businessDate");
        return Objects.equals(actualDate, expectedDate);
    }
}

Soru 14: Bir Spring Batch işi nasıl etkili test edilir?

Spring Batch işlerini test etmek katmanlı bir yaklaşım gerektirir: bileşenlerin birim testleri (reader, processor, writer), step entegrasyon testleri ve tüm iş için uçtan uca testler.

OrderProcessorTest.javajava
// Processor birim testi
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {

    @Mock
    private PricingService pricingService;

    @Mock
    private ValidationService validationService;

    @InjectMocks
    private OrderItemProcessor processor;

    @Test
    void shouldProcessValidOrder() {
        // Given
        OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
        when(validationService.isValid(input)).thenReturn(true);
        when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));

        // When
        ProcessedOrder result = processor.process(input);

        // Then
        assertThat(result).isNotNull();
        assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
    }

    @Test
    void shouldFilterInvalidOrder() {
        // Given
        OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
        when(validationService.isValid(input)).thenReturn(false);

        // When
        ProcessedOrder result = processor.process(input);

        // Then - null filtrelendi anlamına gelir
        assertThat(result).isNull();
        verify(pricingService, never()).calculatePrice(any());
    }
}
ImportJobIntegrationTest.javajava
// Tam iş entegrasyon testi
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JobRepositoryTestUtils jobRepositoryTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @BeforeEach
    void setup() {
        // Testler arası metadata'yı temizler
        jobRepositoryTestUtils.removeJobExecutions();
        // Test verilerini sıfırlar
        jdbcTemplate.execute("DELETE FROM processed_orders");
        jdbcTemplate.execute("DELETE FROM orders");
    }

    @Test
    void shouldCompleteJobSuccessfully() throws Exception {
        // Given - test verileri
        insertTestOrders(100);

        // When
        JobParameters params = new JobParametersBuilder()
                .addLocalDate("businessDate", LocalDate.now())
                .addLong("run.id", System.currentTimeMillis())
                .toJobParameters();

        JobExecution execution = jobLauncherTestUtils.launchJob(params);

        // Then
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(countProcessedOrders()).isEqualTo(100);
    }

    @Test
    void shouldHandleEmptyDataset() throws Exception {
        // Given - veri yok

        // When
        JobExecution execution = jobLauncherTestUtils.launchJob();

        // Then - veri olmasa bile iş başarılı olur
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }

    @Test
    void shouldRestartFromFailurePoint() throws Exception {
        // Given - işlemenin ortasında hata simüle eder
        insertTestOrders(100);
        insertPoisonOrder(50);  // Hataya neden olur

        // When - ilk çalıştırma başarısız olur
        JobExecution firstExecution = jobLauncherTestUtils.launchJob();
        assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);

        // Veriyi düzelt
        removePoisonOrder(50);

        // When - restart
        JobExecution restartExecution = jobLauncherTestUtils.launchJob(
                firstExecution.getJobParameters());

        // Then - hata noktasından devam eder
        assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }

    private void insertTestOrders(int count) {
        for (int i = 1; i <= count; i++) {
            jdbcTemplate.update(
                    "INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
                    i, i * 10, BigDecimal.valueOf(i * 10));
        }
    }

    private int countProcessedOrders() {
        return jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM processed_orders", Integer.class);
    }
}

Soru 15: Veritabanı yazma performansı nasıl optimize edilir?

Yazma genellikle darboğaz haline gelir. Optimizasyonlar şunları içerir: JDBC batch insert'ler, yükleme sırasında constraint'lerin devre dışı bırakılması ve staging tabloların kullanımı.

OptimizedJdbcWriter.javajava
// Yüksek hacimler için optimize edilmiş writer
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;
    private final DataSource dataSource;

    public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
        this.jdbcTemplate = jdbcTemplate;
        this.dataSource = dataSource;
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
        List<? extends ProcessedOrder> items = chunk.getItems();

        if (items.isEmpty()) {
            return;
        }

        // Batch ile PreparedStatement kullanır
        try (Connection connection = dataSource.getConnection();
             PreparedStatement ps = connection.prepareStatement(
                     "INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
                             "VALUES (?, ?, ?, ?)")) {

            for (ProcessedOrder order : items) {
                ps.setLong(1, order.orderId());
                ps.setLong(2, order.customerId());
                ps.setBigDecimal(3, order.finalPrice());
                ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                ps.addBatch();
            }

            // Tüm insert'leri tek ağ operasyonunda yürütür
            ps.executeBatch();
        }
    }
}

// StagingTableWriter.java
// Çok büyük hacimler için staging table deseni
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {

    private final JdbcTemplate jdbcTemplate;
    private String stagingTable;

    public StagingTableWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        // Bu step için geçici tablo oluşturur
        stagingTable = "staging_orders_" + stepExecution.getId();
        jdbcTemplate.execute(
                "CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) {
        // Staging tabloya yazar (FK constraint olmadan)
        String sql = "INSERT INTO " + stagingTable +
                " (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";

        jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
                (ps, order) -> {
                    ps.setLong(1, order.orderId());
                    ps.setLong(2, order.customerId());
                    ps.setBigDecimal(3, order.finalPrice());
                    ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                });
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
            // Hedef tabloya bulk copy
            jdbcTemplate.execute(
                    "INSERT INTO processed_orders SELECT * FROM " + stagingTable);
        }
        // Staging tabloyu temizler
        jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
        return stepExecution.getExitStatus();
    }
}

Sonuç

Teknik mülakatlarda Spring Batch 5'e hakim olmak iç mekanizmaların derin anlaşılmasına dayanır:

Mimari: Job → Step → Chunk (Reader, Processor, Writer)

Chunk işleme: boyutlandırma, yaşam döngüsü, transaction'lar

Partitioning: yerel vs uzak, partition dengeleme

Hata toleransı: skip, retry, restart uygun politikalarla

Ölçekleme: multi-threading, parallel steps, remote chunking

Test: birim, entegrasyon, uçtan uca

Optimizasyon: batch yazma, staging tablolar, izleme

İleri düzey sorular bağlama göre mimari kararları gerekçelendirme yetkinliğini değerlendirir: veri hacmi, zaman kısıtları, hata toleransı ve mevcut altyapı.

Pratik yapmaya başla!

Mülakat simülatörleri ve teknik testlerle bilgini test et.

Etiketler

#spring batch
#spring boot
#java
#batch processing
#interview questions

Paylaş

İlgili makaleler