สัมภาษณ์ Spring Batch 5: Partitioning, Chunk และ Fault Tolerance

เชี่ยวชาญการสัมภาษณ์ Spring Batch 5: 15 คำถามสำคัญเกี่ยวกับ partitioning การประมวลผลแบบ chunk และความทนทานต่อข้อผิดพลาด พร้อมตัวอย่างโค้ด Java 21

สัมภาษณ์ Spring Batch 5: partitioning, chunk และ fault tolerance

Spring Batch 5 เป็นเสาหลักของการประมวลผลข้อมูลขนาดใหญ่ในระบบนิเวศ Spring การสัมภาษณ์ทางเทคนิคประเมินความสามารถในการออกแบบ job ที่แข็งแกร่ง ขยายขนาดได้ และทนทานต่อความล้มเหลว ความเชี่ยวชาญใน partitioning, การประมวลผลแบบ chunk และกลไกความทนทานต่อข้อผิดพลาดทำให้นักพัฒนาระดับซีเนียร์โดดเด่น

ประเด็นสำคัญในการสัมภาษณ์

ผู้สรรหาทดสอบความเข้าใจเชิงลึก ทำไมจึงเลือก partitioning แทน remote chunking และจะกำหนดขนาด chunk ที่เหมาะสมอย่างไร การตัดสินใจเชิงสถาปัตยกรรมเหล่านี้สะท้อนประสบการณ์การใช้งานจริง

สถาปัตยกรรมหลักของ Spring Batch 5

คำถามที่ 1: ส่วนประกอบหลักของ Spring Batch มีอะไรบ้าง

สถาปัตยกรรม Spring Batch ประกอบด้วยสามชั้น ได้แก่ Application (job และโค้ดธุรกิจ), Batch Core (คลาสรันไทม์เพื่อเริ่มและควบคุม job) และ Infrastructure (reader, writer และบริการทั่วไปเช่น RetryTemplate)

BatchJobConfig.javajava
// การกำหนดค่า job ของ Spring Batch 5 ด้วย Java 21
@Configuration
public class BatchJobConfig {

    // JobRepository เก็บ metadata การประมวลผล
    // เปิดให้สามารถ restart และเฝ้าติดตาม job ได้
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public BatchJobConfig(JobRepository jobRepository,
                          PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    // Job ห่อหุ้มกระบวนการ batch ทั้งหมด
    // ประกอบด้วย Step หนึ่งหรือหลาย Step ที่ถูกประมวลผลตามลำดับ
    @Bean
    public Job importUserJob(Step processUsersStep, Step cleanupStep) {
        return new JobBuilder("importUserJob", jobRepository)
                .start(processUsersStep)      // Step ประมวลผลหลัก
                .next(cleanupStep)             // Step ทำความสะอาด
                .build();
    }

    // Step แทนหน่วยงานที่เป็นอิสระ
    // มีสองแบบ: Tasklet (งานเดี่ยว) หรือ Chunk (ประมวลผลแบบวนซ้ำ)
    @Bean
    public Step processUsersStep(ItemReader<UserRecord> reader,
                                  ItemProcessor<UserRecord, User> processor,
                                  ItemWriter<User> writer) {
        return new StepBuilder("processUsersStep", jobRepository)
                .<UserRecord, User>chunk(100, transactionManager)  // commit ทุก 100 รายการ
                .reader(reader)       // อ่านข้อมูลต้นทาง
                .processor(processor) // แปลงแต่ละรายการ
                .writer(writer)       // เขียนเป็นชุดละ 100
                .build();
    }
}

JobRepository บันทึกสถานะการประมวลผลลงในฐานข้อมูล ความถาวรนี้ทำให้สามารถ restart job ที่ล้มเหลวได้ตรงจุดที่หยุด โดยไม่ต้องประมวลผลข้อมูลที่ commit แล้วซ้ำ

คำถามที่ 2: Tasklet กับการประมวลผลแบบ Chunk ต่างกันอย่างไร

Tasklet ดำเนินการแบบไม่วนซ้ำเพียงครั้งเดียว เช่น ลบไฟล์ เรียก stored procedure หรือส่งอีเมลแจ้งเตือน Chunk ประมวลผลปริมาณมหาศาลด้วยการแบ่งข้อมูลเป็นชุดย่อยที่จัดการได้

CleanupTasklet.javajava
// Tasklet: การกระทำเดียวโดยไม่วนซ้ำ
@Component
public class CleanupTasklet implements Tasklet {

    private final Path tempDirectory = Path.of("/tmp/batch-work");

    @Override
    public RepeatStatus execute(StepContribution contribution,
                                 ChunkContext chunkContext) throws Exception {
        // ลบไฟล์ชั่วคราวทั้งหมดของการประมวลผล
        try (var files = Files.walk(tempDirectory)) {
            files.filter(Files::isRegularFile)
                 .forEach(this::deleteQuietly);
        }

        // FINISHED แสดงว่า tasklet ทำงานเสร็จแล้ว
        // CONTINUABLE จะรันใหม่ (มีประโยชน์สำหรับ polling)
        return RepeatStatus.FINISHED;
    }

    private void deleteQuietly(Path file) {
        try {
            Files.delete(file);
        } catch (IOException e) {
            // บันทึก log แล้วทำต่อ - ไม่ทำให้ job ล้มเหลวเพราะไฟล์เดียว
        }
    }
}
ChunkProcessingConfig.javajava
// การประมวลผลแบบ chunk: รับมือปริมาณสูง
@Configuration
public class ChunkProcessingConfig {

    @Bean
    public Step processOrdersStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<OrderRecord> reader,
                                   ItemProcessor<OrderRecord, ProcessedOrder> processor,
                                   ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("processOrdersStep", jobRepository)
                // Chunk 500: อ่าน 500 รายการ ประมวลผล เขียน แล้ว commit
                .<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                // Listener เพื่อเฝ้าดูความคืบหน้า
                .listener(new ChunkProgressListener())
                .build();
    }
}

การประมวลผลแบบ chunk ให้ประโยชน์สำคัญ คือ จัดการหน่วยความจำได้อย่างมีประสิทธิภาพ (เก็บเฉพาะ chunk ปัจจุบัน) ทรานแซกชันแบบละเอียด (commit ต่อ chunk) และการกู้คืนเมื่อเกิดข้อผิดพลาดที่ chunk ที่ commit ล่าสุด

เจาะลึกการประมวลผลแบบ Chunk

คำถามที่ 3: วงจรชีวิตของ chunk ทำงานอย่างไร

แต่ละ chunk เดินตามวงจรที่แม่นยำ คือ อ่านรายการทีละชิ้นจนถึงขนาดที่กำหนด ประมวลผลแต่ละรายการแยกกัน แล้วจึงเขียนทั้งกลุ่ม โดยมีทรานแซกชันครอบทั้ง chunk

OrderItemReader.javajava
// ItemReader: อ่านครั้งละหนึ่งรายการ
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {

    // @StepScope: instance ใหม่ต่อการรัน step
    // เปิดให้ inject พารามิเตอร์ของ job แบบไดนามิก
    @Value("#{jobParameters['startDate']}")
    private LocalDate startDate;

    private Iterator<OrderRecord> orderIterator;

    @BeforeStep
    public void initializeReader(StepExecution stepExecution) {
        // โหลดข้อมูลตอนเริ่ม step
        List<OrderRecord> orders = fetchOrdersFromDate(startDate);
        this.orderIterator = orders.iterator();
    }

    @Override
    public OrderRecord read() {
        // คืน null เพื่อสื่อว่าหมดข้อมูล
        // Spring Batch จะเรียก read() จนกว่าจะได้ null
        if (orderIterator.hasNext()) {
            return orderIterator.next();
        }
        return null;  // จบ dataset
    }

    private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
        // ดึงจากแหล่งข้อมูล
        return List.of();  // การ implement จริง
    }
}
OrderItemProcessor.javajava
// ItemProcessor: แปลงแต่ละรายการแยกกัน
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {

    private final PricingService pricingService;
    private final ValidationService validationService;

    public OrderItemProcessor(PricingService pricingService,
                               ValidationService validationService) {
        this.pricingService = pricingService;
        this.validationService = validationService;
    }

    @Override
    public ProcessedOrder process(OrderRecord item) {
        // คืน null จะทำให้รายการถูกกรอง (ไม่ถูกเขียน)
        if (!validationService.isValid(item)) {
            return null;  // รายการถูกกรอง
        }

        // การแปลงเชิงธุรกิจ
        BigDecimal finalPrice = pricingService.calculatePrice(item);

        return new ProcessedOrder(
                item.orderId(),
                item.customerId(),
                finalPrice,
                LocalDateTime.now()
        );
    }
}
OrderItemWriter.javajava
// ItemWriter: เขียนทั้ง chunk ในการดำเนินการเดียว
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;

    public OrderItemWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) {
        // chunk บรรจุรายการที่ประมวลผลแล้วทั้งหมด
        // เขียนแบบ batch เพื่อประสิทธิภาพที่ดี
        List<? extends ProcessedOrder> items = chunk.getItems();

        jdbcTemplate.batchUpdate(
                "INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
                items,
                items.size(),
                (ps, order) -> {
                    ps.setLong(1, order.orderId());
                    ps.setLong(2, order.customerId());
                    ps.setBigDecimal(3, order.finalPrice());
                    ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                }
        );
    }
}

หากเกิด exception ระหว่างประมวลผล chunk ทรานแซกชันจะถูก rollback จากนั้น job สามารถดำเนินต่อจาก chunk นั้นโดยอาศัย metadata ใน JobRepository

คำถามที่ 4: เลือกขนาด chunk ที่เหมาะสมอย่างไร

ขนาด chunk ส่งผลโดยตรงต่อประสิทธิภาพและการใช้หน่วยความจำ chunk ที่เล็กเกินไปจะเพิ่มจำนวน commit (overhead) chunk ที่ใหญ่เกินไปจะใช้หน่วยความจำมากและทำให้ rollback นานเมื่อเกิดข้อผิดพลาด

ChunkSizingConfig.javajava
// การกำหนดขนาด chunk แบบไดนามิก
@Configuration
public class ChunkSizingConfig {

    // ค่าเริ่มต้นที่เหมาะสมในกรณีทั่วไป
    private static final int DEFAULT_CHUNK_SIZE = 100;

    // สำหรับรายการเบา (มีฟิลด์น้อย)
    private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;

    // สำหรับรายการหนัก (blob, เอกสาร)
    private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;

    @Bean
    public Step processLightDataStep(JobRepository jobRepository,
                                      PlatformTransactionManager txManager,
                                      ItemReader<LightRecord> reader,
                                      ItemWriter<LightRecord> writer) {
        return new StepBuilder("processLightDataStep", jobRepository)
                // รายการเบา: chunk ใหญ่ขึ้นเพื่อ commit น้อยลง
                .<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
                .reader(reader)
                .writer(writer)
                .build();
    }

    @Bean
    public Step processDocumentsStep(JobRepository jobRepository,
                                      PlatformTransactionManager txManager,
                                      ItemReader<Document> reader,
                                      ItemProcessor<Document, ProcessedDocument> processor,
                                      ItemWriter<ProcessedDocument> writer) {
        return new StepBuilder("processDocumentsStep", jobRepository)
                // เอกสารหนัก: chunk เล็กลงเพื่อจำกัดการใช้หน่วยความจำ
                .<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}
กฎที่ใช้ได้จริง

เริ่มที่ 100 รายการต่อ chunk แล้วปรับตามตัวชี้วัด ได้แก่ เวลาที่ใช้ commit การใช้หน่วยความจำ และเวลาของ rollback ใช้ listener ในการตรวจสอบและหาจุดที่เหมาะสม

Partitioning เพื่อการประมวลผลแบบขนาน

คำถามที่ 5: Partitioning คืออะไรและควรใช้เมื่อใด

Partitioning แบ่ง dataset เป็น partition อิสระที่ประมวลผลขนานกัน แต่ละ partition รันใน thread ของตัวเอง (ภายในเครื่อง) หรือบน worker ระยะไกล แนวทางนี้เพิ่มอัตราการประมวลผลโดยไม่สูญเสียความสามารถในการ restart

PartitionedJobConfig.javajava
// การกำหนดค่า job แบบ partition
@Configuration
public class PartitionedJobConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public PartitionedJobConfig(JobRepository jobRepository,
                                 PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    @Bean
    public Job partitionedImportJob(Step partitionedStep) {
        return new JobBuilder("partitionedImportJob", jobRepository)
                .start(partitionedStep)
                .build();
    }

    // Manager step: ประสาน partition ต่างๆ
    @Bean
    public Step partitionedStep(Partitioner partitioner,
                                 Step workerStep,
                                 TaskExecutor taskExecutor) {
        return new StepBuilder("partitionedStep", jobRepository)
                // แบ่งงานผ่าน Partitioner
                .partitioner("workerStep", partitioner)
                // Step ที่จะรันให้แต่ละ partition
                .step(workerStep)
                // 8 thread ขนาน
                .taskExecutor(taskExecutor)
                // จำนวน partition ที่จะสร้าง
                .gridSize(8)
                .build();
    }

    // TaskExecutor สำหรับการรันแบบขนาน
    @Bean
    public TaskExecutor batchTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(16);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("batch-partition-");
        executor.initialize();
        return executor;
    }
}
RangePartitioner.javajava
// Partitioner ตามช่วง ID
@Component
public class RangePartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public RangePartitioner(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // ดึงขอบเขตของ dataset
        Long minId = jdbcTemplate.queryForObject(
                "SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
        Long maxId = jdbcTemplate.queryForObject(
                "SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);

        if (minId == null || maxId == null) {
            return Map.of();  // ไม่มีข้อมูลให้ประมวลผล
        }

        // คำนวณขนาดของแต่ละ partition
        long range = (maxId - minId) / gridSize + 1;
        Map<String, ExecutionContext> partitions = new HashMap<>();

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            long start = minId + (i * range);
            long end = Math.min(start + range - 1, maxId);

            // แต่ละ partition ได้รับขอบเขตของตน
            context.putLong("minId", start);
            context.putLong("maxId", end);
            context.putInt("partitionNumber", i);

            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

Partitioning เหมาะกับ dataset ขนาดใหญ่ที่รายการเป็นอิสระจากกัน partition ต้องสมดุลเพื่อไม่ให้ partition ที่ช้ารั้ง job ทั้งหมดให้ช้าลง

คำถามที่ 6: Partitioning แบบ local กับ remote ต่างกันอย่างไร

Local partitioning รันทุก partition ใน JVM เดียวกันด้วย thread pool Remote partitioning กระจาย partition ไปยัง JVM หลายตัว (worker) ผ่าน middleware ส่งข้อความ

RemotePartitioningConfig.javajava
// การกำหนดค่า remote partitioning ด้วย messaging
@Configuration
public class RemotePartitioningConfig {

    @Bean
    public Step managerStep(JobRepository jobRepository,
                             Partitioner partitioner,
                             MessageChannelPartitionHandler partitionHandler) {
        return new StepBuilder("managerStep", jobRepository)
                .partitioner("workerStep", partitioner)
                // Handler สื่อสารกับ worker ระยะไกล
                .partitionHandler(partitionHandler)
                .build();
    }

    // PartitionHandler ส่ง ExecutionContext ให้ worker
    @Bean
    public MessageChannelPartitionHandler partitionHandler(
            MessagingTemplate messagingTemplate,
            JobExplorer jobExplorer) {
        MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
        handler.setStepName("workerStep");
        handler.setGridSize(4);
        handler.setMessagingOperations(messagingTemplate);
        handler.setJobExplorer(jobExplorer);
        // Timeout ของการรอให้ worker เสร็จ
        handler.setPollInterval(5000L);
        return handler;
    }
}
WorkerConfiguration.javajava
// การกำหนดค่าฝั่ง worker
@Configuration
public class WorkerConfiguration {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public WorkerConfiguration(JobRepository jobRepository,
                                PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    // Worker รับ partition และรัน step
    @Bean
    public Step workerStep(ItemReader<OrderRecord> reader,
                            ItemProcessor<OrderRecord, ProcessedOrder> processor,
                            ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("workerStep", jobRepository)
                .<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
                // Reader ที่ใช้ @StepScope เพื่อรับพารามิเตอร์ partition
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    // Reader ที่ใช้ขอบเขตของ partition
    @Bean
    @StepScope
    public JdbcCursorItemReader<OrderRecord> partitionedReader(
            DataSource dataSource,
            @Value("#{stepExecutionContext['minId']}") Long minId,
            @Value("#{stepExecutionContext['maxId']}") Long maxId) {
        return new JdbcCursorItemReaderBuilder<OrderRecord>()
                .name("partitionedOrderReader")
                .dataSource(dataSource)
                .sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
                .preparedStatementSetter(ps -> {
                    ps.setLong(1, minId);
                    ps.setLong(2, maxId);
                })
                .rowMapper(new OrderRecordRowMapper())
                .build();
    }
}

พร้อมที่จะพิชิตการสัมภาษณ์ Spring Boot แล้วหรือยังครับ?

ฝึกฝนด้วยตัวจำลองแบบโต้ตอบ, flashcards และแบบทดสอบเทคนิคครับ

Fault tolerance และการกู้คืนข้อผิดพลาด

คำถามที่ 7: Spring Batch มีกลไก fault tolerance อะไรบ้าง

Spring Batch มีกลไกที่เสริมกันสามแบบ ได้แก่ skip (ข้ามรายการที่ล้มเหลว), retry (ลองใหม่อัตโนมัติ) และ restart (กลับมาทำ job ที่ล้มเหลวต่อ) กลไกเหล่านี้ตั้งค่าที่ระดับ step

FaultTolerantStepConfig.javajava
// การกำหนดค่า fault tolerance อย่างครบถ้วน
@Configuration
public class FaultTolerantStepConfig {

    @Bean
    public Step faultTolerantStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<DataRecord> reader,
                                   ItemProcessor<DataRecord, ProcessedRecord> processor,
                                   ItemWriter<ProcessedRecord> writer,
                                   SkipPolicy customSkipPolicy) {
        return new StepBuilder("faultTolerantStep", jobRepository)
                .<DataRecord, ProcessedRecord>chunk(100, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                // เปิดโหมด fault tolerant
                .faultTolerant()
                // SKIP: ข้ามได้สูงสุด 10 ข้อผิดพลาดด้านการตรวจสอบ
                .skipLimit(10)
                .skip(ValidationException.class)
                .skip(DataIntegrityViolationException.class)
                // ข้อผิดพลาดบางอย่างห้ามข้าม
                .noSkip(FatalBatchException.class)
                // RETRY: ลองใหม่กับข้อผิดพลาดชั่วคราว
                .retryLimit(3)
                .retry(TransientDataAccessException.class)
                .retry(DeadlockLoserDataAccessException.class)
                // Backoff แบบเอ็กซ์โพเนนเชียลระหว่างความพยายาม
                .backOffPolicy(exponentialBackOffPolicy())
                // Listener บันทึก log การข้าม
                .listener(skipListener())
                .build();
    }

    @Bean
    public BackOffPolicy exponentialBackOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(1000);  // 1 วินาที
        policy.setMultiplier(2.0);         // เพิ่มเป็นสองเท่าต่อครั้ง
        policy.setMaxInterval(10000);      // สูงสุด 10 วินาที
        return policy;
    }

    @Bean
    public SkipListener<DataRecord, ProcessedRecord> skipListener() {
        return new SkipListener<>() {
            @Override
            public void onSkipInRead(Throwable t) {
                // บันทึกรายการที่อ่านไม่ได้
            }

            @Override
            public void onSkipInProcess(DataRecord item, Throwable t) {
                // บันทึกรายการที่ล้มเหลวระหว่างประมวลผล
            }

            @Override
            public void onSkipInWrite(ProcessedRecord item, Throwable t) {
                // บันทึกรายการที่ล้มเหลวระหว่างเขียน
            }
        };
    }
}

Retry เหมาะกับข้อผิดพลาดชั่วคราว เช่น timeout เครือข่ายหรือ deadlock ฐานข้อมูล Skip เหมาะกับข้อผิดพลาดข้อมูลรายตัวที่ไม่ควรหยุดการประมวลผลโดยรวม

คำถามที่ 8: ทำ SkipPolicy ที่ปรับแต่งเองอย่างไร

SkipPolicy ที่ปรับแต่งเองช่วยให้สามารถตัดสินใจอย่างละเอียด เช่น ข้ามตามประเภท exception จำนวนข้อผิดพลาด หรือเกณฑ์ทางธุรกิจเฉพาะ

AdaptiveSkipPolicy.javajava
// SkipPolicy ที่มี logic ทางธุรกิจขั้นสูง
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {

    private static final int MAX_SKIP_COUNT = 100;
    private static final double MAX_SKIP_PERCENTAGE = 0.05;  // ไม่เกิน 5%

    private final AtomicInteger totalProcessed = new AtomicInteger(0);
    private final AtomicInteger skipCount = new AtomicInteger(0);

    @Override
    public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
        // อย่าข้ามข้อผิดพลาดร้ายแรง
        if (exception instanceof FatalBatchException
                || exception instanceof OutOfMemoryError) {
            return false;
        }

        // เพดานการข้ามแบบเด็ดขาด
        if (skipCountSoFar >= MAX_SKIP_COUNT) {
            return false;  // หยุด job
        }

        // เพดานเป็นเปอร์เซ็นต์
        int total = totalProcessed.get();
        if (total > 1000) {  // ใช้หลังจากช่วงอุ่นเครื่อง
            double skipPercentage = (double) skipCountSoFar / total;
            if (skipPercentage > MAX_SKIP_PERCENTAGE) {
                return false;  // ข้อผิดพลาดเกินสัดส่วน
            }
        }

        // ข้ามข้อผิดพลาดด้านการตรวจสอบและข้อมูล
        return exception instanceof ValidationException
                || exception instanceof DataFormatException
                || exception instanceof IllegalArgumentException;
    }

    // ถูกเรียกโดย listener เพื่อติดตามความคืบหน้า
    public void incrementProcessed() {
        totalProcessed.incrementAndGet();
    }
}

คำถามที่ 9: การ restart job ที่ล้มเหลวทำงานอย่างไร

JobRepository เก็บสถานะของแต่ละการประมวลผล เมื่อ restart Spring Batch จะระบุ chunk ที่ commit ล่าสุดและกลับมาทำต่อจากจุดนั้น รายการที่ประมวลผลสำเร็จจะไม่ถูกประมวลผลซ้ำ

JobRestartService.javajava
// บริการจัดการการ restart job
@Service
public class JobRestartService {

    private final JobLauncher jobLauncher;
    private final JobExplorer jobExplorer;
    private final JobRepository jobRepository;
    private final Job importJob;

    public JobRestartService(JobLauncher jobLauncher,
                              JobExplorer jobExplorer,
                              JobRepository jobRepository,
                              @Qualifier("importJob") Job importJob) {
        this.jobLauncher = jobLauncher;
        this.jobExplorer = jobExplorer;
        this.jobRepository = jobRepository;
        this.importJob = importJob;
    }

    public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
        // ดึงการประมวลผลที่ล้มเหลว
        JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);

        if (failedExecution == null) {
            throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
        }

        // ตรวจว่า job restart ได้
        if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
            throw new IllegalStateException("Only FAILED jobs can be restarted");
        }

        // ใช้พารามิเตอร์ชุดเดิม
        JobParameters originalParams = failedExecution.getJobParameters();

        // เรียก job ใหม่ - จะดำเนินต่ออัตโนมัติจาก checkpoint สุดท้าย
        return jobLauncher.run(importJob, originalParams);
    }

    public List<JobExecution> findRestartableJobs() {
        // แสดงการประมวลผล FAILED ทั้งหมดที่ยังไม่ได้ restart
        return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
                .stream()
                .flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
                .filter(exec -> exec.getStatus() == BatchStatus.FAILED)
                .filter(this::isRestartable)
                .toList();
    }

    private boolean isRestartable(JobExecution execution) {
        // ยืนยันว่าไม่มีการประมวลผลสำเร็จที่ใหม่กว่า
        JobInstance instance = execution.getJobInstance();
        return jobExplorer.getJobExecutions(instance).stream()
                .noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
    }
}
กับดักในการสัมภาษณ์

job จะ restart ได้เฉพาะเมื่อ JobParameters เหมือนเดิม การเปลี่ยนพารามิเตอร์จะสร้าง instance ใหม่และทำให้ประวัติความคืบหน้าหายไป

การสเกลและการปรับให้เหมาะสม

คำถามที่ 10: มีกลยุทธ์ในการสเกลแบบใดบ้าง

Spring Batch มีสี่กลยุทธ์ ได้แก่ multi-threaded step (หลาย thread อ่านขนานกัน), parallel steps (step อิสระทำงานขนาน), remote chunking (ประมวลผลแบบกระจาย) และ partitioning (ข้อมูลกระจาย)

MultiThreadedStepConfig.javajava
// Step แบบหลาย thread: หลาย thread ประมวลผล dataset เดียวกัน
@Configuration
public class MultiThreadedStepConfig {

    @Bean
    public Step multiThreadedStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<Record> reader,
                                   ItemProcessor<Record, ProcessedRecord> processor,
                                   ItemWriter<ProcessedRecord> writer,
                                   TaskExecutor taskExecutor) {
        return new StepBuilder("multiThreadedStep", jobRepository)
                .<Record, ProcessedRecord>chunk(100, transactionManager)
                // ข้อควรระวัง: reader ต้อง thread-safe
                .reader(synchronizedReader(reader))
                .processor(processor)
                .writer(writer)
                // 4 thread ประมวลผล chunk ขนานกัน
                .taskExecutor(taskExecutor)
                .throttleLimit(4)
                .build();
    }

    // Wrapper เพื่อทำให้ reader thread-safe
    private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
        SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
        syncReader.setDelegate((ItemStreamReader<Record>) reader);
        return syncReader;
    }
}
ParallelStepsConfig.javajava
// รัน step อิสระแบบขนาน
@Configuration
public class ParallelStepsConfig {

    @Bean
    public Job parallelJob(JobRepository jobRepository,
                            Step loadCustomersStep,
                            Step loadProductsStep,
                            Step loadOrdersStep,
                            Step processDataStep) {
        // Flow ขนาน: customers และ products ถูกโหลดพร้อมกัน
        Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
                .start(loadCustomersStep)
                .build();

        Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
                .start(loadProductsStep)
                .build();

        Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
                .start(loadOrdersStep)
                .build();

        // Split รัน flow แบบขนาน
        return new JobBuilder("parallelJob", jobRepository)
                .start(new FlowBuilder<Flow>("parallelLoadFlow")
                        .split(new SimpleAsyncTaskExecutor())
                        .add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
                        .build())
                // หลังโหลดขนาน ค่อยประมวลผลตามลำดับ
                .next(processDataStep)
                .build()
                .build();
    }
}

Multi-threading เหมาะกับกรณีที่ reader ถูก synchronize ได้ Partitioning เป็นทางเลือกที่ดีกว่าสำหรับปริมาณมาก เพราะแต่ละ partition มี reader ของตัวเองโดยไม่แย่งทรัพยากร

คำถามที่ 11: เฝ้าติดตามประสิทธิภาพของ job อย่างไร

Spring Batch เปิดเผย metric ผ่าน listener และ JobRepository การเชื่อม Micrometer ช่วยส่งออกไปยัง Prometheus, Grafana หรือระบบ monitoring อื่น

BatchMetricsConfig.javajava
// การกำหนดค่า monitoring ด้วย Micrometer
@Configuration
public class BatchMetricsConfig {

    private final MeterRegistry meterRegistry;

    public BatchMetricsConfig(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }

    @Bean
    public JobExecutionListener metricsJobListener() {
        return new JobExecutionListener() {

            private Timer.Sample jobTimer;

            @Override
            public void beforeJob(JobExecution jobExecution) {
                // เริ่มจับเวลา job
                jobTimer = Timer.start(meterRegistry);
                Counter.builder("batch.job.started")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .register(meterRegistry)
                        .increment();
            }

            @Override
            public void afterJob(JobExecution jobExecution) {
                // บันทึกระยะเวลารวม
                jobTimer.stop(Timer.builder("batch.job.duration")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .tag("status", jobExecution.getStatus().toString())
                        .register(meterRegistry));

                // ตัวนับ job ตามสถานะ
                Counter.builder("batch.job.completed")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .tag("status", jobExecution.getStatus().toString())
                        .register(meterRegistry)
                        .increment();
            }
        };
    }

    @Bean
    public StepExecutionListener metricsStepListener() {
        return new StepExecutionListener() {

            @Override
            public void afterStep(StepExecution stepExecution) {
                String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
                String stepName = stepExecution.getStepName();

                // ตัวชี้วัด throughput
                Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                return null;
            }
        };
    }
}

คำถามที่ 12: มีกับดักทั่วไปอะไรบ้างเมื่อทำ partitioning

ข้อผิดพลาดที่พบบ่อย ได้แก่ partition ไม่สมดุล (partition เดียวมีข้อมูลถึง 90%) reader ที่ไม่ thread-safe และการจัดการสถานะระหว่าง partition ที่ผิดพลาด

BalancedPartitioner.javajava
// Partitioner ที่กระจายภาระอย่างสมดุลจริง
@Component
public class BalancedPartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // นับจำนวนรายการรวมที่จะประมวลผล
        Integer totalCount = jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);

        if (totalCount == null || totalCount == 0) {
            return Map.of();
        }

        // คำนวณขนาดเป้าหมายต่อ partition
        int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);

        Map<String, ExecutionContext> partitions = new HashMap<>();

        // ใช้ OFFSET/LIMIT เพื่อ partition ที่สมดุล
        // มีต้นทุนสูงกว่าการใช้ช่วง แต่รับรองความสมดุล
        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.putInt("offset", i * itemsPerPartition);
            context.putInt("limit", itemsPerPartition);
            context.putInt("partitionNumber", i);

            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

// OffsetBasedReader.java
// Reader ที่เข้ากับ partitioning แบบ offset
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {

    private final JdbcTemplate jdbcTemplate;
    private Iterator<OrderRecord> iterator;

    @Value("#{stepExecutionContext['offset']}")
    private int offset;

    @Value("#{stepExecutionContext['limit']}")
    private int limit;

    public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void open(ExecutionContext executionContext) {
        // โหลดส่วนที่ partition นี้รับผิดชอบโดยตรง
        List<OrderRecord> records = jdbcTemplate.query(
                "SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
                new OrderRecordRowMapper(),
                limit, offset
        );
        this.iterator = records.iterator();
    }

    @Override
    public OrderRecord read() {
        return iterator.hasNext() ? iterator.next() : null;
    }

    @Override
    public void update(ExecutionContext executionContext) {
        // เก็บสถานะสำหรับ restart หากจำเป็น
    }

    @Override
    public void close() {
        // ทำความสะอาด
    }
}

คำถามขั้นสูงสำหรับซีเนียร์

คำถามที่ 13: จัดการการพึ่งพากันระหว่าง job อย่างไร

Spring Batch ไม่จัดการการพึ่งพาระหว่าง job แบบ native วิธีแก้ ได้แก่ ใช้ orchestrator ภายนอก (Airflow, Kubernetes CronJob) หรือ implement เองด้วย JobExplorer

JobDependencyService.javajava
// การจัดการการพึ่งพากันระหว่าง job
@Service
public class JobDependencyService {

    private final JobExplorer jobExplorer;
    private final JobLauncher jobLauncher;
    private final Map<String, Job> jobs;

    public JobDependencyService(JobExplorer jobExplorer,
                                  JobLauncher jobLauncher,
                                  Map<String, Job> jobs) {
        this.jobExplorer = jobExplorer;
        this.jobLauncher = jobLauncher;
        this.jobs = jobs;
    }

    public JobExecution runWithDependencies(String jobName,
                                             JobParameters params,
                                             List<String> dependsOn) throws Exception {
        // ตรวจสอบว่าทุก dependency สำเร็จ
        for (String dependency : dependsOn) {
            if (!hasSuccessfulExecution(dependency, params)) {
                throw new JobExecutionException(
                        "Dependency not satisfied: " + dependency);
            }
        }

        Job job = jobs.get(jobName);
        if (job == null) {
            throw new IllegalArgumentException("Unknown job: " + jobName);
        }

        return jobLauncher.run(job, params);
    }

    private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
        // ค้นหาการประมวลผล COMPLETED ที่ใช้พารามิเตอร์ทางธุรกิจเดียวกัน
        return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
                .stream()
                .flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
                .filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
                .anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
    }

    private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
        // เปรียบเทียบพารามิเตอร์ทางธุรกิจ (ไม่สนใจ timestamp ของการรัน)
        String actualDate = actual.getString("businessDate");
        String expectedDate = expected.getString("businessDate");
        return Objects.equals(actualDate, expectedDate);
    }
}

คำถามที่ 14: ทดสอบ job Spring Batch อย่างมีประสิทธิภาพอย่างไร

การทดสอบ job Spring Batch ต้องมีแนวทางหลายชั้น ได้แก่ unit test สำหรับ component (reader, processor, writer), integration test สำหรับ step และ end-to-end test สำหรับ job ทั้งหมด

OrderProcessorTest.javajava
// Unit test ของ processor
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {

    @Mock
    private PricingService pricingService;

    @Mock
    private ValidationService validationService;

    @InjectMocks
    private OrderItemProcessor processor;

    @Test
    void shouldProcessValidOrder() {
        // Given
        OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
        when(validationService.isValid(input)).thenReturn(true);
        when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));

        // When
        ProcessedOrder result = processor.process(input);

        // Then
        assertThat(result).isNotNull();
        assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
    }

    @Test
    void shouldFilterInvalidOrder() {
        // Given
        OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
        when(validationService.isValid(input)).thenReturn(false);

        // When
        ProcessedOrder result = processor.process(input);

        // Then - null หมายถึงถูกกรอง
        assertThat(result).isNull();
        verify(pricingService, never()).calculatePrice(any());
    }
}
ImportJobIntegrationTest.javajava
// Integration test ของ job ทั้งหมด
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JobRepositoryTestUtils jobRepositoryTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @BeforeEach
    void setup() {
        // ล้าง metadata ระหว่างเทส
        jobRepositoryTestUtils.removeJobExecutions();
        // รีเซ็ตข้อมูลทดสอบ
        jdbcTemplate.execute("DELETE FROM processed_orders");
        jdbcTemplate.execute("DELETE FROM orders");
    }

    @Test
    void shouldCompleteJobSuccessfully() throws Exception {
        // Given - ข้อมูลทดสอบ
        insertTestOrders(100);

        // When
        JobParameters params = new JobParametersBuilder()
                .addLocalDate("businessDate", LocalDate.now())
                .addLong("run.id", System.currentTimeMillis())
                .toJobParameters();

        JobExecution execution = jobLauncherTestUtils.launchJob(params);

        // Then
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(countProcessedOrders()).isEqualTo(100);
    }

    @Test
    void shouldHandleEmptyDataset() throws Exception {
        // Given - ไม่มีข้อมูล

        // When
        JobExecution execution = jobLauncherTestUtils.launchJob();

        // Then - job สำเร็จแม้ไม่มีข้อมูล
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }

    @Test
    void shouldRestartFromFailurePoint() throws Exception {
        // Given - จำลองข้อผิดพลาดกลางการประมวลผล
        insertTestOrders(100);
        insertPoisonOrder(50);  // ทำให้เกิดข้อผิดพลาด

        // When - การรันครั้งแรกล้มเหลว
        JobExecution firstExecution = jobLauncherTestUtils.launchJob();
        assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);

        // แก้ข้อมูล
        removePoisonOrder(50);

        // When - restart
        JobExecution restartExecution = jobLauncherTestUtils.launchJob(
                firstExecution.getJobParameters());

        // Then - ดำเนินต่อจากจุดที่ล้มเหลว
        assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }

    private void insertTestOrders(int count) {
        for (int i = 1; i <= count; i++) {
            jdbcTemplate.update(
                    "INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
                    i, i * 10, BigDecimal.valueOf(i * 10));
        }
    }

    private int countProcessedOrders() {
        return jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM processed_orders", Integer.class);
    }
}

คำถามที่ 15: ปรับประสิทธิภาพการเขียนลงฐานข้อมูลอย่างไร

การเขียนมักเป็นคอขวด การปรับให้เหมาะสมรวมถึง JDBC batch insert ปิด constraint ระหว่างโหลด และใช้ตาราง staging

OptimizedJdbcWriter.javajava
// Writer ที่ถูกปรับให้รองรับปริมาณสูง
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;
    private final DataSource dataSource;

    public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
        this.jdbcTemplate = jdbcTemplate;
        this.dataSource = dataSource;
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
        List<? extends ProcessedOrder> items = chunk.getItems();

        if (items.isEmpty()) {
            return;
        }

        // ใช้ PreparedStatement แบบ batch
        try (Connection connection = dataSource.getConnection();
             PreparedStatement ps = connection.prepareStatement(
                     "INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
                             "VALUES (?, ?, ?, ?)")) {

            for (ProcessedOrder order : items) {
                ps.setLong(1, order.orderId());
                ps.setLong(2, order.customerId());
                ps.setBigDecimal(3, order.finalPrice());
                ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                ps.addBatch();
            }

            // ดำเนินการ insert ทั้งหมดในการสื่อสารเครือข่ายเดียว
            ps.executeBatch();
        }
    }
}

// StagingTableWriter.java
// รูปแบบ staging table สำหรับปริมาณมหาศาล
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {

    private final JdbcTemplate jdbcTemplate;
    private String stagingTable;

    public StagingTableWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        // สร้างตารางชั่วคราวสำหรับ step นี้
        stagingTable = "staging_orders_" + stepExecution.getId();
        jdbcTemplate.execute(
                "CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) {
        // เขียนเข้า staging table (ไม่มี FK constraint)
        String sql = "INSERT INTO " + stagingTable +
                " (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";

        jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
                (ps, order) -> {
                    ps.setLong(1, order.orderId());
                    ps.setLong(2, order.customerId());
                    ps.setBigDecimal(3, order.finalPrice());
                    ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                });
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
            // คัดลอกแบบ bulk ไปยังตารางหลัก
            jdbcTemplate.execute(
                    "INSERT INTO processed_orders SELECT * FROM " + stagingTable);
        }
        // ล้าง staging table
        jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
        return stepExecution.getExitStatus();
    }
}

บทสรุป

การเชี่ยวชาญ Spring Batch 5 ในการสัมภาษณ์ทางเทคนิคขึ้นอยู่กับความเข้าใจกลไกภายในอย่างลึกซึ้ง

สถาปัตยกรรม: Job → Step → Chunk (Reader, Processor, Writer)

การประมวลผลแบบ chunk: การกำหนดขนาด วงจรชีวิต ทรานแซกชัน

Partitioning: local กับ remote การถ่วงดุล partition

Fault tolerance: skip, retry, restart พร้อมนโยบายที่เหมาะสม

การสเกล: multi-threading, parallel steps, remote chunking

การทดสอบ: unit, integration, end-to-end

การปรับแต่ง: batch writes, staging tables, การเฝ้าติดตาม

คำถามขั้นสูงประเมินความสามารถในการให้เหตุผลแก่การตัดสินใจทางสถาปัตยกรรมตามบริบท ได้แก่ ปริมาณข้อมูล ข้อจำกัดด้านเวลา ระดับความทนทานต่อข้อผิดพลาด และโครงสร้างพื้นฐานที่มี

เริ่มฝึกซ้อมเลย!

ทดสอบความรู้ของคุณด้วยตัวจำลองสัมภาษณ์และแบบทดสอบเทคนิคครับ

แท็ก

#spring batch
#spring boot
#java
#batch processing
#interview questions

แชร์

บทความที่เกี่ยวข้อง