สัมภาษณ์ Spring Batch 5: Partitioning, Chunk และ Fault Tolerance
เชี่ยวชาญการสัมภาษณ์ Spring Batch 5: 15 คำถามสำคัญเกี่ยวกับ partitioning การประมวลผลแบบ chunk และความทนทานต่อข้อผิดพลาด พร้อมตัวอย่างโค้ด Java 21

Spring Batch 5 เป็นเสาหลักของการประมวลผลข้อมูลขนาดใหญ่ในระบบนิเวศ Spring การสัมภาษณ์ทางเทคนิคประเมินความสามารถในการออกแบบ job ที่แข็งแกร่ง ขยายขนาดได้ และทนทานต่อความล้มเหลว ความเชี่ยวชาญใน partitioning, การประมวลผลแบบ chunk และกลไกความทนทานต่อข้อผิดพลาดทำให้นักพัฒนาระดับซีเนียร์โดดเด่น
ผู้สรรหาทดสอบความเข้าใจเชิงลึก ทำไมจึงเลือก partitioning แทน remote chunking และจะกำหนดขนาด chunk ที่เหมาะสมอย่างไร การตัดสินใจเชิงสถาปัตยกรรมเหล่านี้สะท้อนประสบการณ์การใช้งานจริง
สถาปัตยกรรมหลักของ Spring Batch 5
คำถามที่ 1: ส่วนประกอบหลักของ Spring Batch มีอะไรบ้าง
สถาปัตยกรรม Spring Batch ประกอบด้วยสามชั้น ได้แก่ Application (job และโค้ดธุรกิจ), Batch Core (คลาสรันไทม์เพื่อเริ่มและควบคุม job) และ Infrastructure (reader, writer และบริการทั่วไปเช่น RetryTemplate)
// การกำหนดค่า job ของ Spring Batch 5 ด้วย Java 21
@Configuration
public class BatchJobConfig {
// JobRepository เก็บ metadata การประมวลผล
// เปิดให้สามารถ restart และเฝ้าติดตาม job ได้
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public BatchJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Job ห่อหุ้มกระบวนการ batch ทั้งหมด
// ประกอบด้วย Step หนึ่งหรือหลาย Step ที่ถูกประมวลผลตามลำดับ
@Bean
public Job importUserJob(Step processUsersStep, Step cleanupStep) {
return new JobBuilder("importUserJob", jobRepository)
.start(processUsersStep) // Step ประมวลผลหลัก
.next(cleanupStep) // Step ทำความสะอาด
.build();
}
// Step แทนหน่วยงานที่เป็นอิสระ
// มีสองแบบ: Tasklet (งานเดี่ยว) หรือ Chunk (ประมวลผลแบบวนซ้ำ)
@Bean
public Step processUsersStep(ItemReader<UserRecord> reader,
ItemProcessor<UserRecord, User> processor,
ItemWriter<User> writer) {
return new StepBuilder("processUsersStep", jobRepository)
.<UserRecord, User>chunk(100, transactionManager) // commit ทุก 100 รายการ
.reader(reader) // อ่านข้อมูลต้นทาง
.processor(processor) // แปลงแต่ละรายการ
.writer(writer) // เขียนเป็นชุดละ 100
.build();
}
}JobRepository บันทึกสถานะการประมวลผลลงในฐานข้อมูล ความถาวรนี้ทำให้สามารถ restart job ที่ล้มเหลวได้ตรงจุดที่หยุด โดยไม่ต้องประมวลผลข้อมูลที่ commit แล้วซ้ำ
คำถามที่ 2: Tasklet กับการประมวลผลแบบ Chunk ต่างกันอย่างไร
Tasklet ดำเนินการแบบไม่วนซ้ำเพียงครั้งเดียว เช่น ลบไฟล์ เรียก stored procedure หรือส่งอีเมลแจ้งเตือน Chunk ประมวลผลปริมาณมหาศาลด้วยการแบ่งข้อมูลเป็นชุดย่อยที่จัดการได้
// Tasklet: การกระทำเดียวโดยไม่วนซ้ำ
@Component
public class CleanupTasklet implements Tasklet {
private final Path tempDirectory = Path.of("/tmp/batch-work");
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// ลบไฟล์ชั่วคราวทั้งหมดของการประมวลผล
try (var files = Files.walk(tempDirectory)) {
files.filter(Files::isRegularFile)
.forEach(this::deleteQuietly);
}
// FINISHED แสดงว่า tasklet ทำงานเสร็จแล้ว
// CONTINUABLE จะรันใหม่ (มีประโยชน์สำหรับ polling)
return RepeatStatus.FINISHED;
}
private void deleteQuietly(Path file) {
try {
Files.delete(file);
} catch (IOException e) {
// บันทึก log แล้วทำต่อ - ไม่ทำให้ job ล้มเหลวเพราะไฟล์เดียว
}
}
}// การประมวลผลแบบ chunk: รับมือปริมาณสูง
@Configuration
public class ChunkProcessingConfig {
@Bean
public Step processOrdersStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("processOrdersStep", jobRepository)
// Chunk 500: อ่าน 500 รายการ ประมวลผล เขียน แล้ว commit
.<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Listener เพื่อเฝ้าดูความคืบหน้า
.listener(new ChunkProgressListener())
.build();
}
}การประมวลผลแบบ chunk ให้ประโยชน์สำคัญ คือ จัดการหน่วยความจำได้อย่างมีประสิทธิภาพ (เก็บเฉพาะ chunk ปัจจุบัน) ทรานแซกชันแบบละเอียด (commit ต่อ chunk) และการกู้คืนเมื่อเกิดข้อผิดพลาดที่ chunk ที่ commit ล่าสุด
เจาะลึกการประมวลผลแบบ Chunk
คำถามที่ 3: วงจรชีวิตของ chunk ทำงานอย่างไร
แต่ละ chunk เดินตามวงจรที่แม่นยำ คือ อ่านรายการทีละชิ้นจนถึงขนาดที่กำหนด ประมวลผลแต่ละรายการแยกกัน แล้วจึงเขียนทั้งกลุ่ม โดยมีทรานแซกชันครอบทั้ง chunk
// ItemReader: อ่านครั้งละหนึ่งรายการ
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {
// @StepScope: instance ใหม่ต่อการรัน step
// เปิดให้ inject พารามิเตอร์ของ job แบบไดนามิก
@Value("#{jobParameters['startDate']}")
private LocalDate startDate;
private Iterator<OrderRecord> orderIterator;
@BeforeStep
public void initializeReader(StepExecution stepExecution) {
// โหลดข้อมูลตอนเริ่ม step
List<OrderRecord> orders = fetchOrdersFromDate(startDate);
this.orderIterator = orders.iterator();
}
@Override
public OrderRecord read() {
// คืน null เพื่อสื่อว่าหมดข้อมูล
// Spring Batch จะเรียก read() จนกว่าจะได้ null
if (orderIterator.hasNext()) {
return orderIterator.next();
}
return null; // จบ dataset
}
private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
// ดึงจากแหล่งข้อมูล
return List.of(); // การ implement จริง
}
}// ItemProcessor: แปลงแต่ละรายการแยกกัน
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {
private final PricingService pricingService;
private final ValidationService validationService;
public OrderItemProcessor(PricingService pricingService,
ValidationService validationService) {
this.pricingService = pricingService;
this.validationService = validationService;
}
@Override
public ProcessedOrder process(OrderRecord item) {
// คืน null จะทำให้รายการถูกกรอง (ไม่ถูกเขียน)
if (!validationService.isValid(item)) {
return null; // รายการถูกกรอง
}
// การแปลงเชิงธุรกิจ
BigDecimal finalPrice = pricingService.calculatePrice(item);
return new ProcessedOrder(
item.orderId(),
item.customerId(),
finalPrice,
LocalDateTime.now()
);
}
}// ItemWriter: เขียนทั้ง chunk ในการดำเนินการเดียว
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
public OrderItemWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// chunk บรรจุรายการที่ประมวลผลแล้วทั้งหมด
// เขียนแบบ batch เพื่อประสิทธิภาพที่ดี
List<? extends ProcessedOrder> items = chunk.getItems();
jdbcTemplate.batchUpdate(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
items,
items.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
}
);
}
}หากเกิด exception ระหว่างประมวลผล chunk ทรานแซกชันจะถูก rollback จากนั้น job สามารถดำเนินต่อจาก chunk นั้นโดยอาศัย metadata ใน JobRepository
คำถามที่ 4: เลือกขนาด chunk ที่เหมาะสมอย่างไร
ขนาด chunk ส่งผลโดยตรงต่อประสิทธิภาพและการใช้หน่วยความจำ chunk ที่เล็กเกินไปจะเพิ่มจำนวน commit (overhead) chunk ที่ใหญ่เกินไปจะใช้หน่วยความจำมากและทำให้ rollback นานเมื่อเกิดข้อผิดพลาด
// การกำหนดขนาด chunk แบบไดนามิก
@Configuration
public class ChunkSizingConfig {
// ค่าเริ่มต้นที่เหมาะสมในกรณีทั่วไป
private static final int DEFAULT_CHUNK_SIZE = 100;
// สำหรับรายการเบา (มีฟิลด์น้อย)
private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;
// สำหรับรายการหนัก (blob, เอกสาร)
private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;
@Bean
public Step processLightDataStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<LightRecord> reader,
ItemWriter<LightRecord> writer) {
return new StepBuilder("processLightDataStep", jobRepository)
// รายการเบา: chunk ใหญ่ขึ้นเพื่อ commit น้อยลง
.<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Step processDocumentsStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<Document> reader,
ItemProcessor<Document, ProcessedDocument> processor,
ItemWriter<ProcessedDocument> writer) {
return new StepBuilder("processDocumentsStep", jobRepository)
// เอกสารหนัก: chunk เล็กลงเพื่อจำกัดการใช้หน่วยความจำ
.<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}เริ่มที่ 100 รายการต่อ chunk แล้วปรับตามตัวชี้วัด ได้แก่ เวลาที่ใช้ commit การใช้หน่วยความจำ และเวลาของ rollback ใช้ listener ในการตรวจสอบและหาจุดที่เหมาะสม
Partitioning เพื่อการประมวลผลแบบขนาน
คำถามที่ 5: Partitioning คืออะไรและควรใช้เมื่อใด
Partitioning แบ่ง dataset เป็น partition อิสระที่ประมวลผลขนานกัน แต่ละ partition รันใน thread ของตัวเอง (ภายในเครื่อง) หรือบน worker ระยะไกล แนวทางนี้เพิ่มอัตราการประมวลผลโดยไม่สูญเสียความสามารถในการ restart
// การกำหนดค่า job แบบ partition
@Configuration
public class PartitionedJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public PartitionedJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
@Bean
public Job partitionedImportJob(Step partitionedStep) {
return new JobBuilder("partitionedImportJob", jobRepository)
.start(partitionedStep)
.build();
}
// Manager step: ประสาน partition ต่างๆ
@Bean
public Step partitionedStep(Partitioner partitioner,
Step workerStep,
TaskExecutor taskExecutor) {
return new StepBuilder("partitionedStep", jobRepository)
// แบ่งงานผ่าน Partitioner
.partitioner("workerStep", partitioner)
// Step ที่จะรันให้แต่ละ partition
.step(workerStep)
// 8 thread ขนาน
.taskExecutor(taskExecutor)
// จำนวน partition ที่จะสร้าง
.gridSize(8)
.build();
}
// TaskExecutor สำหรับการรันแบบขนาน
@Bean
public TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("batch-partition-");
executor.initialize();
return executor;
}
}// Partitioner ตามช่วง ID
@Component
public class RangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public RangePartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// ดึงขอบเขตของ dataset
Long minId = jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
Long maxId = jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);
if (minId == null || maxId == null) {
return Map.of(); // ไม่มีข้อมูลให้ประมวลผล
}
// คำนวณขนาดของแต่ละ partition
long range = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
long start = minId + (i * range);
long end = Math.min(start + range - 1, maxId);
// แต่ละ partition ได้รับขอบเขตของตน
context.putLong("minId", start);
context.putLong("maxId", end);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}Partitioning เหมาะกับ dataset ขนาดใหญ่ที่รายการเป็นอิสระจากกัน partition ต้องสมดุลเพื่อไม่ให้ partition ที่ช้ารั้ง job ทั้งหมดให้ช้าลง
คำถามที่ 6: Partitioning แบบ local กับ remote ต่างกันอย่างไร
Local partitioning รันทุก partition ใน JVM เดียวกันด้วย thread pool Remote partitioning กระจาย partition ไปยัง JVM หลายตัว (worker) ผ่าน middleware ส่งข้อความ
// การกำหนดค่า remote partitioning ด้วย messaging
@Configuration
public class RemotePartitioningConfig {
@Bean
public Step managerStep(JobRepository jobRepository,
Partitioner partitioner,
MessageChannelPartitionHandler partitionHandler) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", partitioner)
// Handler สื่อสารกับ worker ระยะไกล
.partitionHandler(partitionHandler)
.build();
}
// PartitionHandler ส่ง ExecutionContext ให้ worker
@Bean
public MessageChannelPartitionHandler partitionHandler(
MessagingTemplate messagingTemplate,
JobExplorer jobExplorer) {
MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
handler.setStepName("workerStep");
handler.setGridSize(4);
handler.setMessagingOperations(messagingTemplate);
handler.setJobExplorer(jobExplorer);
// Timeout ของการรอให้ worker เสร็จ
handler.setPollInterval(5000L);
return handler;
}
}// การกำหนดค่าฝั่ง worker
@Configuration
public class WorkerConfiguration {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public WorkerConfiguration(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Worker รับ partition และรัน step
@Bean
public Step workerStep(ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("workerStep", jobRepository)
.<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
// Reader ที่ใช้ @StepScope เพื่อรับพารามิเตอร์ partition
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
// Reader ที่ใช้ขอบเขตของ partition
@Bean
@StepScope
public JdbcCursorItemReader<OrderRecord> partitionedReader(
DataSource dataSource,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
return new JdbcCursorItemReaderBuilder<OrderRecord>()
.name("partitionedOrderReader")
.dataSource(dataSource)
.sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
.preparedStatementSetter(ps -> {
ps.setLong(1, minId);
ps.setLong(2, maxId);
})
.rowMapper(new OrderRecordRowMapper())
.build();
}
}พร้อมที่จะพิชิตการสัมภาษณ์ Spring Boot แล้วหรือยังครับ?
ฝึกฝนด้วยตัวจำลองแบบโต้ตอบ, flashcards และแบบทดสอบเทคนิคครับ
Fault tolerance และการกู้คืนข้อผิดพลาด
คำถามที่ 7: Spring Batch มีกลไก fault tolerance อะไรบ้าง
Spring Batch มีกลไกที่เสริมกันสามแบบ ได้แก่ skip (ข้ามรายการที่ล้มเหลว), retry (ลองใหม่อัตโนมัติ) และ restart (กลับมาทำ job ที่ล้มเหลวต่อ) กลไกเหล่านี้ตั้งค่าที่ระดับ step
// การกำหนดค่า fault tolerance อย่างครบถ้วน
@Configuration
public class FaultTolerantStepConfig {
@Bean
public Step faultTolerantStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<DataRecord> reader,
ItemProcessor<DataRecord, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
SkipPolicy customSkipPolicy) {
return new StepBuilder("faultTolerantStep", jobRepository)
.<DataRecord, ProcessedRecord>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// เปิดโหมด fault tolerant
.faultTolerant()
// SKIP: ข้ามได้สูงสุด 10 ข้อผิดพลาดด้านการตรวจสอบ
.skipLimit(10)
.skip(ValidationException.class)
.skip(DataIntegrityViolationException.class)
// ข้อผิดพลาดบางอย่างห้ามข้าม
.noSkip(FatalBatchException.class)
// RETRY: ลองใหม่กับข้อผิดพลาดชั่วคราว
.retryLimit(3)
.retry(TransientDataAccessException.class)
.retry(DeadlockLoserDataAccessException.class)
// Backoff แบบเอ็กซ์โพเนนเชียลระหว่างความพยายาม
.backOffPolicy(exponentialBackOffPolicy())
// Listener บันทึก log การข้าม
.listener(skipListener())
.build();
}
@Bean
public BackOffPolicy exponentialBackOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(1000); // 1 วินาที
policy.setMultiplier(2.0); // เพิ่มเป็นสองเท่าต่อครั้ง
policy.setMaxInterval(10000); // สูงสุด 10 วินาที
return policy;
}
@Bean
public SkipListener<DataRecord, ProcessedRecord> skipListener() {
return new SkipListener<>() {
@Override
public void onSkipInRead(Throwable t) {
// บันทึกรายการที่อ่านไม่ได้
}
@Override
public void onSkipInProcess(DataRecord item, Throwable t) {
// บันทึกรายการที่ล้มเหลวระหว่างประมวลผล
}
@Override
public void onSkipInWrite(ProcessedRecord item, Throwable t) {
// บันทึกรายการที่ล้มเหลวระหว่างเขียน
}
};
}
}Retry เหมาะกับข้อผิดพลาดชั่วคราว เช่น timeout เครือข่ายหรือ deadlock ฐานข้อมูล Skip เหมาะกับข้อผิดพลาดข้อมูลรายตัวที่ไม่ควรหยุดการประมวลผลโดยรวม
คำถามที่ 8: ทำ SkipPolicy ที่ปรับแต่งเองอย่างไร
SkipPolicy ที่ปรับแต่งเองช่วยให้สามารถตัดสินใจอย่างละเอียด เช่น ข้ามตามประเภท exception จำนวนข้อผิดพลาด หรือเกณฑ์ทางธุรกิจเฉพาะ
// SkipPolicy ที่มี logic ทางธุรกิจขั้นสูง
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {
private static final int MAX_SKIP_COUNT = 100;
private static final double MAX_SKIP_PERCENTAGE = 0.05; // ไม่เกิน 5%
private final AtomicInteger totalProcessed = new AtomicInteger(0);
private final AtomicInteger skipCount = new AtomicInteger(0);
@Override
public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
// อย่าข้ามข้อผิดพลาดร้ายแรง
if (exception instanceof FatalBatchException
|| exception instanceof OutOfMemoryError) {
return false;
}
// เพดานการข้ามแบบเด็ดขาด
if (skipCountSoFar >= MAX_SKIP_COUNT) {
return false; // หยุด job
}
// เพดานเป็นเปอร์เซ็นต์
int total = totalProcessed.get();
if (total > 1000) { // ใช้หลังจากช่วงอุ่นเครื่อง
double skipPercentage = (double) skipCountSoFar / total;
if (skipPercentage > MAX_SKIP_PERCENTAGE) {
return false; // ข้อผิดพลาดเกินสัดส่วน
}
}
// ข้ามข้อผิดพลาดด้านการตรวจสอบและข้อมูล
return exception instanceof ValidationException
|| exception instanceof DataFormatException
|| exception instanceof IllegalArgumentException;
}
// ถูกเรียกโดย listener เพื่อติดตามความคืบหน้า
public void incrementProcessed() {
totalProcessed.incrementAndGet();
}
}คำถามที่ 9: การ restart job ที่ล้มเหลวทำงานอย่างไร
JobRepository เก็บสถานะของแต่ละการประมวลผล เมื่อ restart Spring Batch จะระบุ chunk ที่ commit ล่าสุดและกลับมาทำต่อจากจุดนั้น รายการที่ประมวลผลสำเร็จจะไม่ถูกประมวลผลซ้ำ
// บริการจัดการการ restart job
@Service
public class JobRestartService {
private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
private final Job importJob;
public JobRestartService(JobLauncher jobLauncher,
JobExplorer jobExplorer,
JobRepository jobRepository,
@Qualifier("importJob") Job importJob) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
this.importJob = importJob;
}
public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
// ดึงการประมวลผลที่ล้มเหลว
JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);
if (failedExecution == null) {
throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
}
// ตรวจว่า job restart ได้
if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
throw new IllegalStateException("Only FAILED jobs can be restarted");
}
// ใช้พารามิเตอร์ชุดเดิม
JobParameters originalParams = failedExecution.getJobParameters();
// เรียก job ใหม่ - จะดำเนินต่ออัตโนมัติจาก checkpoint สุดท้าย
return jobLauncher.run(importJob, originalParams);
}
public List<JobExecution> findRestartableJobs() {
// แสดงการประมวลผล FAILED ทั้งหมดที่ยังไม่ได้ restart
return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.FAILED)
.filter(this::isRestartable)
.toList();
}
private boolean isRestartable(JobExecution execution) {
// ยืนยันว่าไม่มีการประมวลผลสำเร็จที่ใหม่กว่า
JobInstance instance = execution.getJobInstance();
return jobExplorer.getJobExecutions(instance).stream()
.noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
}
}job จะ restart ได้เฉพาะเมื่อ JobParameters เหมือนเดิม การเปลี่ยนพารามิเตอร์จะสร้าง instance ใหม่และทำให้ประวัติความคืบหน้าหายไป
การสเกลและการปรับให้เหมาะสม
คำถามที่ 10: มีกลยุทธ์ในการสเกลแบบใดบ้าง
Spring Batch มีสี่กลยุทธ์ ได้แก่ multi-threaded step (หลาย thread อ่านขนานกัน), parallel steps (step อิสระทำงานขนาน), remote chunking (ประมวลผลแบบกระจาย) และ partitioning (ข้อมูลกระจาย)
// Step แบบหลาย thread: หลาย thread ประมวลผล dataset เดียวกัน
@Configuration
public class MultiThreadedStepConfig {
@Bean
public Step multiThreadedStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<Record> reader,
ItemProcessor<Record, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
TaskExecutor taskExecutor) {
return new StepBuilder("multiThreadedStep", jobRepository)
.<Record, ProcessedRecord>chunk(100, transactionManager)
// ข้อควรระวัง: reader ต้อง thread-safe
.reader(synchronizedReader(reader))
.processor(processor)
.writer(writer)
// 4 thread ประมวลผล chunk ขนานกัน
.taskExecutor(taskExecutor)
.throttleLimit(4)
.build();
}
// Wrapper เพื่อทำให้ reader thread-safe
private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
syncReader.setDelegate((ItemStreamReader<Record>) reader);
return syncReader;
}
}// รัน step อิสระแบบขนาน
@Configuration
public class ParallelStepsConfig {
@Bean
public Job parallelJob(JobRepository jobRepository,
Step loadCustomersStep,
Step loadProductsStep,
Step loadOrdersStep,
Step processDataStep) {
// Flow ขนาน: customers และ products ถูกโหลดพร้อมกัน
Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
.start(loadCustomersStep)
.build();
Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
.start(loadProductsStep)
.build();
Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
.start(loadOrdersStep)
.build();
// Split รัน flow แบบขนาน
return new JobBuilder("parallelJob", jobRepository)
.start(new FlowBuilder<Flow>("parallelLoadFlow")
.split(new SimpleAsyncTaskExecutor())
.add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
.build())
// หลังโหลดขนาน ค่อยประมวลผลตามลำดับ
.next(processDataStep)
.build()
.build();
}
}Multi-threading เหมาะกับกรณีที่ reader ถูก synchronize ได้ Partitioning เป็นทางเลือกที่ดีกว่าสำหรับปริมาณมาก เพราะแต่ละ partition มี reader ของตัวเองโดยไม่แย่งทรัพยากร
คำถามที่ 11: เฝ้าติดตามประสิทธิภาพของ job อย่างไร
Spring Batch เปิดเผย metric ผ่าน listener และ JobRepository การเชื่อม Micrometer ช่วยส่งออกไปยัง Prometheus, Grafana หรือระบบ monitoring อื่น
// การกำหนดค่า monitoring ด้วย Micrometer
@Configuration
public class BatchMetricsConfig {
private final MeterRegistry meterRegistry;
public BatchMetricsConfig(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Bean
public JobExecutionListener metricsJobListener() {
return new JobExecutionListener() {
private Timer.Sample jobTimer;
@Override
public void beforeJob(JobExecution jobExecution) {
// เริ่มจับเวลา job
jobTimer = Timer.start(meterRegistry);
Counter.builder("batch.job.started")
.tag("job", jobExecution.getJobInstance().getJobName())
.register(meterRegistry)
.increment();
}
@Override
public void afterJob(JobExecution jobExecution) {
// บันทึกระยะเวลารวม
jobTimer.stop(Timer.builder("batch.job.duration")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry));
// ตัวนับ job ตามสถานะ
Counter.builder("batch.job.completed")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry)
.increment();
}
};
}
@Bean
public StepExecutionListener metricsStepListener() {
return new StepExecutionListener() {
@Override
public void afterStep(StepExecution stepExecution) {
String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
String stepName = stepExecution.getStepName();
// ตัวชี้วัด throughput
Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
return null;
}
};
}
}คำถามที่ 12: มีกับดักทั่วไปอะไรบ้างเมื่อทำ partitioning
ข้อผิดพลาดที่พบบ่อย ได้แก่ partition ไม่สมดุล (partition เดียวมีข้อมูลถึง 90%) reader ที่ไม่ thread-safe และการจัดการสถานะระหว่าง partition ที่ผิดพลาด
// Partitioner ที่กระจายภาระอย่างสมดุลจริง
@Component
public class BalancedPartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// นับจำนวนรายการรวมที่จะประมวลผล
Integer totalCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);
if (totalCount == null || totalCount == 0) {
return Map.of();
}
// คำนวณขนาดเป้าหมายต่อ partition
int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);
Map<String, ExecutionContext> partitions = new HashMap<>();
// ใช้ OFFSET/LIMIT เพื่อ partition ที่สมดุล
// มีต้นทุนสูงกว่าการใช้ช่วง แต่รับรองความสมดุล
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("offset", i * itemsPerPartition);
context.putInt("limit", itemsPerPartition);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}
// OffsetBasedReader.java
// Reader ที่เข้ากับ partitioning แบบ offset
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {
private final JdbcTemplate jdbcTemplate;
private Iterator<OrderRecord> iterator;
@Value("#{stepExecutionContext['offset']}")
private int offset;
@Value("#{stepExecutionContext['limit']}")
private int limit;
public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void open(ExecutionContext executionContext) {
// โหลดส่วนที่ partition นี้รับผิดชอบโดยตรง
List<OrderRecord> records = jdbcTemplate.query(
"SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
new OrderRecordRowMapper(),
limit, offset
);
this.iterator = records.iterator();
}
@Override
public OrderRecord read() {
return iterator.hasNext() ? iterator.next() : null;
}
@Override
public void update(ExecutionContext executionContext) {
// เก็บสถานะสำหรับ restart หากจำเป็น
}
@Override
public void close() {
// ทำความสะอาด
}
}คำถามขั้นสูงสำหรับซีเนียร์
คำถามที่ 13: จัดการการพึ่งพากันระหว่าง job อย่างไร
Spring Batch ไม่จัดการการพึ่งพาระหว่าง job แบบ native วิธีแก้ ได้แก่ ใช้ orchestrator ภายนอก (Airflow, Kubernetes CronJob) หรือ implement เองด้วย JobExplorer
// การจัดการการพึ่งพากันระหว่าง job
@Service
public class JobDependencyService {
private final JobExplorer jobExplorer;
private final JobLauncher jobLauncher;
private final Map<String, Job> jobs;
public JobDependencyService(JobExplorer jobExplorer,
JobLauncher jobLauncher,
Map<String, Job> jobs) {
this.jobExplorer = jobExplorer;
this.jobLauncher = jobLauncher;
this.jobs = jobs;
}
public JobExecution runWithDependencies(String jobName,
JobParameters params,
List<String> dependsOn) throws Exception {
// ตรวจสอบว่าทุก dependency สำเร็จ
for (String dependency : dependsOn) {
if (!hasSuccessfulExecution(dependency, params)) {
throw new JobExecutionException(
"Dependency not satisfied: " + dependency);
}
}
Job job = jobs.get(jobName);
if (job == null) {
throw new IllegalArgumentException("Unknown job: " + jobName);
}
return jobLauncher.run(job, params);
}
private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
// ค้นหาการประมวลผล COMPLETED ที่ใช้พารามิเตอร์ทางธุรกิจเดียวกัน
return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
.anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
}
private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
// เปรียบเทียบพารามิเตอร์ทางธุรกิจ (ไม่สนใจ timestamp ของการรัน)
String actualDate = actual.getString("businessDate");
String expectedDate = expected.getString("businessDate");
return Objects.equals(actualDate, expectedDate);
}
}คำถามที่ 14: ทดสอบ job Spring Batch อย่างมีประสิทธิภาพอย่างไร
การทดสอบ job Spring Batch ต้องมีแนวทางหลายชั้น ได้แก่ unit test สำหรับ component (reader, processor, writer), integration test สำหรับ step และ end-to-end test สำหรับ job ทั้งหมด
// Unit test ของ processor
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {
@Mock
private PricingService pricingService;
@Mock
private ValidationService validationService;
@InjectMocks
private OrderItemProcessor processor;
@Test
void shouldProcessValidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(true);
when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));
// When
ProcessedOrder result = processor.process(input);
// Then
assertThat(result).isNotNull();
assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
}
@Test
void shouldFilterInvalidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(false);
// When
ProcessedOrder result = processor.process(input);
// Then - null หมายถึงถูกกรอง
assertThat(result).isNull();
verify(pricingService, never()).calculatePrice(any());
}
}// Integration test ของ job ทั้งหมด
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JobRepositoryTestUtils jobRepositoryTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@BeforeEach
void setup() {
// ล้าง metadata ระหว่างเทส
jobRepositoryTestUtils.removeJobExecutions();
// รีเซ็ตข้อมูลทดสอบ
jdbcTemplate.execute("DELETE FROM processed_orders");
jdbcTemplate.execute("DELETE FROM orders");
}
@Test
void shouldCompleteJobSuccessfully() throws Exception {
// Given - ข้อมูลทดสอบ
insertTestOrders(100);
// When
JobParameters params = new JobParametersBuilder()
.addLocalDate("businessDate", LocalDate.now())
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncherTestUtils.launchJob(params);
// Then
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
assertThat(countProcessedOrders()).isEqualTo(100);
}
@Test
void shouldHandleEmptyDataset() throws Exception {
// Given - ไม่มีข้อมูล
// When
JobExecution execution = jobLauncherTestUtils.launchJob();
// Then - job สำเร็จแม้ไม่มีข้อมูล
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
@Test
void shouldRestartFromFailurePoint() throws Exception {
// Given - จำลองข้อผิดพลาดกลางการประมวลผล
insertTestOrders(100);
insertPoisonOrder(50); // ทำให้เกิดข้อผิดพลาด
// When - การรันครั้งแรกล้มเหลว
JobExecution firstExecution = jobLauncherTestUtils.launchJob();
assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);
// แก้ข้อมูล
removePoisonOrder(50);
// When - restart
JobExecution restartExecution = jobLauncherTestUtils.launchJob(
firstExecution.getJobParameters());
// Then - ดำเนินต่อจากจุดที่ล้มเหลว
assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
private void insertTestOrders(int count) {
for (int i = 1; i <= count; i++) {
jdbcTemplate.update(
"INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
i, i * 10, BigDecimal.valueOf(i * 10));
}
}
private int countProcessedOrders() {
return jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM processed_orders", Integer.class);
}
}คำถามที่ 15: ปรับประสิทธิภาพการเขียนลงฐานข้อมูลอย่างไร
การเขียนมักเป็นคอขวด การปรับให้เหมาะสมรวมถึง JDBC batch insert ปิด constraint ระหว่างโหลด และใช้ตาราง staging
// Writer ที่ถูกปรับให้รองรับปริมาณสูง
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
private final DataSource dataSource;
public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
this.jdbcTemplate = jdbcTemplate;
this.dataSource = dataSource;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
List<? extends ProcessedOrder> items = chunk.getItems();
if (items.isEmpty()) {
return;
}
// ใช้ PreparedStatement แบบ batch
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
"VALUES (?, ?, ?, ?)")) {
for (ProcessedOrder order : items) {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
ps.addBatch();
}
// ดำเนินการ insert ทั้งหมดในการสื่อสารเครือข่ายเดียว
ps.executeBatch();
}
}
}
// StagingTableWriter.java
// รูปแบบ staging table สำหรับปริมาณมหาศาล
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {
private final JdbcTemplate jdbcTemplate;
private String stagingTable;
public StagingTableWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void beforeStep(StepExecution stepExecution) {
// สร้างตารางชั่วคราวสำหรับ step นี้
stagingTable = "staging_orders_" + stepExecution.getId();
jdbcTemplate.execute(
"CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// เขียนเข้า staging table (ไม่มี FK constraint)
String sql = "INSERT INTO " + stagingTable +
" (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";
jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
});
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
// คัดลอกแบบ bulk ไปยังตารางหลัก
jdbcTemplate.execute(
"INSERT INTO processed_orders SELECT * FROM " + stagingTable);
}
// ล้าง staging table
jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
return stepExecution.getExitStatus();
}
}บทสรุป
การเชี่ยวชาญ Spring Batch 5 ในการสัมภาษณ์ทางเทคนิคขึ้นอยู่กับความเข้าใจกลไกภายในอย่างลึกซึ้ง
✅ สถาปัตยกรรม: Job → Step → Chunk (Reader, Processor, Writer)
✅ การประมวลผลแบบ chunk: การกำหนดขนาด วงจรชีวิต ทรานแซกชัน
✅ Partitioning: local กับ remote การถ่วงดุล partition
✅ Fault tolerance: skip, retry, restart พร้อมนโยบายที่เหมาะสม
✅ การสเกล: multi-threading, parallel steps, remote chunking
✅ การทดสอบ: unit, integration, end-to-end
✅ การปรับแต่ง: batch writes, staging tables, การเฝ้าติดตาม
คำถามขั้นสูงประเมินความสามารถในการให้เหตุผลแก่การตัดสินใจทางสถาปัตยกรรมตามบริบท ได้แก่ ปริมาณข้อมูล ข้อจำกัดด้านเวลา ระดับความทนทานต่อข้อผิดพลาด และโครงสร้างพื้นฐานที่มี
เริ่มฝึกซ้อมเลย!
ทดสอบความรู้ของคุณด้วยตัวจำลองสัมภาษณ์และแบบทดสอบเทคนิคครับ
แท็ก
แชร์
บทความที่เกี่ยวข้อง

Spring Modulith: สถาปัตยกรรม Monolith แบบโมดูลาร์
เรียนรู้ Spring Modulith เพื่อสร้าง monolith แบบโมดูลาร์ใน Java สถาปัตยกรรม โมดูล อีเวนต์อะซิงโครนัส และการทดสอบด้วย Spring Boot 3

สัมภาษณ์ Spring Boot: การกระจายธุรกรรม
เชี่ยวชาญการกระจายธุรกรรมใน Spring Boot: REQUIRED, REQUIRES_NEW, NESTED และอื่น ๆ 12 คำถามสัมภาษณ์พร้อมโค้ดและกับดักทั่วไป

Spring Security 6: การยืนยันตัวตนด้วย JWT แบบครบถ้วน
คู่มือเชิงปฏิบัติในการนำการยืนยันตัวตนด้วย JWT มาใช้กับ Spring Security 6: การตั้งค่า การสร้างโทเคน การตรวจสอบ และแนวทางปฏิบัติที่ดีด้านความปลอดภัย