Phỏng vấn Spring Batch 5: Phân vùng, Chunk và Khả năng chịu lỗi
Chinh phục các buổi phỏng vấn Spring Batch 5: 15 câu hỏi cốt lõi về phân vùng, xử lý chunk và khả năng chịu lỗi với ví dụ Java 21.

Spring Batch 5 là nền tảng cho việc xử lý dữ liệu quy mô lớn trong hệ sinh thái Spring. Các buổi phỏng vấn kỹ thuật đánh giá khả năng thiết kế các job mạnh mẽ, có khả năng mở rộng và chịu lỗi. Việc thành thạo phân vùng (partitioning), xử lý theo chunk và các cơ chế chịu lỗi tạo nên sự khác biệt cho các lập trình viên cấp cao.
Nhà tuyển dụng kiểm tra hiểu biết sâu: tại sao chọn partitioning thay vì remote chunking? Làm sao để xác định kích thước chunk hợp lý? Những quyết định kiến trúc này phản ánh kinh nghiệm sản xuất thực tế.
Kiến trúc cốt lõi của Spring Batch 5
Câu hỏi 1: Các thành phần chính của Spring Batch là gì?
Kiến trúc Spring Batch gồm ba lớp: ứng dụng (job và mã nghiệp vụ), Batch Core (lớp runtime để khởi chạy và điều khiển job), và hạ tầng (reader, writer và các dịch vụ chung như RetryTemplate).
// Cấu hình job Spring Batch 5 với Java 21
@Configuration
public class BatchJobConfig {
// JobRepository lưu metadata thực thi
// Cho phép restart và giám sát job
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public BatchJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Job đóng gói toàn bộ tiến trình batch
// Bao gồm một hoặc nhiều Step được thực thi tuần tự
@Bean
public Job importUserJob(Step processUsersStep, Step cleanupStep) {
return new JobBuilder("importUserJob", jobRepository)
.start(processUsersStep) // Step xử lý chính
.next(cleanupStep) // Step dọn dẹp
.build();
}
// Step đại diện cho một đơn vị công việc độc lập
// Hai mô hình: Tasklet (tác vụ đơn) hoặc Chunk (xử lý lặp)
@Bean
public Step processUsersStep(ItemReader<UserRecord> reader,
ItemProcessor<UserRecord, User> processor,
ItemWriter<User> writer) {
return new StepBuilder("processUsersStep", jobRepository)
.<UserRecord, User>chunk(100, transactionManager) // Commit mỗi 100 phần tử
.reader(reader) // Đọc dữ liệu nguồn
.processor(processor) // Biến đổi từng phần tử
.writer(writer) // Ghi theo lô 100
.build();
}
}JobRepository lưu trạng thái thực thi vào cơ sở dữ liệu. Tính bền vững này cho phép khởi động lại một job thất bại tại đúng vị trí dừng, không cần xử lý lại dữ liệu đã commit.
Câu hỏi 2: Sự khác biệt giữa Tasklet và xử lý hướng Chunk là gì?
Tasklet thực hiện một hành động rời rạc, không lặp: xóa file, gọi stored procedure, gửi email thông báo. Chunk xử lý khối lượng lớn bằng cách chia dữ liệu thành các lô có thể quản lý.
// Tasklet: hành động đơn không lặp
@Component
public class CleanupTasklet implements Tasklet {
private final Path tempDirectory = Path.of("/tmp/batch-work");
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// Xóa toàn bộ file tạm của tiến trình xử lý
try (var files = Files.walk(tempDirectory)) {
files.filter(Files::isRegularFile)
.forEach(this::deleteQuietly);
}
// FINISHED báo hiệu tasklet đã hoàn thành
// CONTINUABLE sẽ chạy lại (hữu ích cho polling)
return RepeatStatus.FINISHED;
}
private void deleteQuietly(Path file) {
try {
Files.delete(file);
} catch (IOException e) {
// Ghi log và tiếp tục - không dừng job vì một file
}
}
}// Xử lý chunk: xử lý khối lượng lớn
@Configuration
public class ChunkProcessingConfig {
@Bean
public Step processOrdersStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("processOrdersStep", jobRepository)
// Chunk 500: đọc 500 phần tử, xử lý, ghi rồi commit
.<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Listener để giám sát tiến độ
.listener(new ChunkProgressListener())
.build();
}
}Xử lý hướng chunk mang lại các lợi ích quan trọng: quản lý bộ nhớ tối ưu (chỉ chunk hiện tại trong bộ nhớ), giao dịch chi tiết (commit theo chunk), và phục hồi sau lỗi tại chunk đã commit cuối cùng.
Đào sâu xử lý hướng Chunk
Câu hỏi 3: Vòng đời của một chunk hoạt động ra sao?
Mỗi chunk theo một chu trình chính xác: đọc từng phần tử cho đến khi đạt kích thước cấu hình, xử lý từng phần tử riêng biệt, sau đó ghi cả nhóm. Một giao dịch bao trọn cả chunk.
// ItemReader: đọc từng phần tử một
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {
// @StepScope: instance mới cho mỗi lần thực thi step
// Cho phép inject các tham số job động
@Value("#{jobParameters['startDate']}")
private LocalDate startDate;
private Iterator<OrderRecord> orderIterator;
@BeforeStep
public void initializeReader(StepExecution stepExecution) {
// Tải dữ liệu khi step bắt đầu
List<OrderRecord> orders = fetchOrdersFromDate(startDate);
this.orderIterator = orders.iterator();
}
@Override
public OrderRecord read() {
// Trả về null để báo kết thúc dữ liệu
// Spring Batch gọi read() cho đến khi nhận null
if (orderIterator.hasNext()) {
return orderIterator.next();
}
return null; // Kết thúc dataset
}
private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
// Lấy từ nguồn dữ liệu
return List.of(); // Triển khai thực tế
}
}// ItemProcessor: biến đổi từng phần tử riêng biệt
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {
private final PricingService pricingService;
private final ValidationService validationService;
public OrderItemProcessor(PricingService pricingService,
ValidationService validationService) {
this.pricingService = pricingService;
this.validationService = validationService;
}
@Override
public ProcessedOrder process(OrderRecord item) {
// Trả về null sẽ lọc phần tử (không được ghi)
if (!validationService.isValid(item)) {
return null; // Phần tử bị lọc
}
// Biến đổi nghiệp vụ
BigDecimal finalPrice = pricingService.calculatePrice(item);
return new ProcessedOrder(
item.orderId(),
item.customerId(),
finalPrice,
LocalDateTime.now()
);
}
}// ItemWriter: ghi toàn bộ chunk trong một thao tác
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
public OrderItemWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// Chunk chứa toàn bộ phần tử đã xử lý
// Ghi theo lô để tối ưu hiệu năng
List<? extends ProcessedOrder> items = chunk.getItems();
jdbcTemplate.batchUpdate(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
items,
items.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
}
);
}
}Nếu xảy ra ngoại lệ trong khi xử lý chunk, giao dịch được rollback. Job sau đó có thể tiếp tục từ chunk này nhờ metadata lưu trong JobRepository.
Câu hỏi 4: Làm sao chọn kích thước chunk tối ưu?
Kích thước chunk ảnh hưởng trực tiếp đến hiệu năng và mức tiêu thụ bộ nhớ. Chunk quá nhỏ làm tăng số lần commit (overhead). Chunk quá lớn tiêu tốn bộ nhớ và kéo dài rollback khi có lỗi.
// Cấu hình kích thước chunk động
@Configuration
public class ChunkSizingConfig {
// Mặc định hợp lý cho hầu hết trường hợp
private static final int DEFAULT_CHUNK_SIZE = 100;
// Cho phần tử nhẹ (ít trường)
private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;
// Cho phần tử nặng (blob, tài liệu)
private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;
@Bean
public Step processLightDataStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<LightRecord> reader,
ItemWriter<LightRecord> writer) {
return new StepBuilder("processLightDataStep", jobRepository)
// Phần tử nhẹ: chunk lớn hơn để giảm commit
.<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Step processDocumentsStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<Document> reader,
ItemProcessor<Document, ProcessedDocument> processor,
ItemWriter<ProcessedDocument> writer) {
return new StepBuilder("processDocumentsStep", jobRepository)
// Tài liệu nặng: chunk nhỏ hơn để hạn chế bộ nhớ
.<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}Bắt đầu với 100 phần tử mỗi chunk, sau đó điều chỉnh dựa trên các chỉ số: thời gian commit, mức dùng bộ nhớ và thời gian rollback. Dùng listener để xác định điểm tối ưu.
Phân vùng cho xử lý song song
Câu hỏi 5: Phân vùng là gì và khi nào nên dùng?
Phân vùng chia dataset thành các phân vùng độc lập được xử lý song song. Mỗi phân vùng chạy trong thread riêng (cục bộ) hoặc trên worker từ xa. Cách tiếp cận này nhân throughput mà không hy sinh khả năng restart.
// Cấu hình job có phân vùng
@Configuration
public class PartitionedJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public PartitionedJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
@Bean
public Job partitionedImportJob(Step partitionedStep) {
return new JobBuilder("partitionedImportJob", jobRepository)
.start(partitionedStep)
.build();
}
// Step manager: điều phối các phân vùng
@Bean
public Step partitionedStep(Partitioner partitioner,
Step workerStep,
TaskExecutor taskExecutor) {
return new StepBuilder("partitionedStep", jobRepository)
// Chia công việc qua Partitioner
.partitioner("workerStep", partitioner)
// Step thực thi cho mỗi phân vùng
.step(workerStep)
// 8 luồng song song
.taskExecutor(taskExecutor)
// Số phân vùng cần tạo
.gridSize(8)
.build();
}
// TaskExecutor cho thực thi song song
@Bean
public TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("batch-partition-");
executor.initialize();
return executor;
}
}// Partitioner dựa trên khoảng ID
@Component
public class RangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public RangePartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Lấy biên của dataset
Long minId = jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
Long maxId = jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);
if (minId == null || maxId == null) {
return Map.of(); // Không có dữ liệu để xử lý
}
// Tính kích thước mỗi phân vùng
long range = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
long start = minId + (i * range);
long end = Math.min(start + range - 1, maxId);
// Mỗi phân vùng nhận các biên của mình
context.putLong("minId", start);
context.putLong("maxId", end);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}Phân vùng phù hợp với dataset lớn mà các phần tử độc lập với nhau. Phân vùng phải cân bằng để tránh một phân vùng chậm làm chậm cả job.
Câu hỏi 6: Sự khác biệt giữa phân vùng cục bộ và từ xa là gì?
Phân vùng cục bộ chạy mọi phân vùng trên cùng JVM với một thread pool. Phân vùng từ xa phân phối phân vùng qua nhiều JVM (worker) thông qua middleware messaging.
// Cấu hình phân vùng từ xa với messaging
@Configuration
public class RemotePartitioningConfig {
@Bean
public Step managerStep(JobRepository jobRepository,
Partitioner partitioner,
MessageChannelPartitionHandler partitionHandler) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", partitioner)
// Handler giao tiếp với worker từ xa
.partitionHandler(partitionHandler)
.build();
}
// PartitionHandler gửi ExecutionContext tới worker
@Bean
public MessageChannelPartitionHandler partitionHandler(
MessagingTemplate messagingTemplate,
JobExplorer jobExplorer) {
MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
handler.setStepName("workerStep");
handler.setGridSize(4);
handler.setMessagingOperations(messagingTemplate);
handler.setJobExplorer(jobExplorer);
// Timeout chờ worker hoàn tất
handler.setPollInterval(5000L);
return handler;
}
}// Cấu hình phía worker
@Configuration
public class WorkerConfiguration {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public WorkerConfiguration(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Worker nhận phân vùng và thực thi step
@Bean
public Step workerStep(ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("workerStep", jobRepository)
.<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
// Reader cấu hình @StepScope để nhận tham số phân vùng
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
// Reader sử dụng biên của phân vùng
@Bean
@StepScope
public JdbcCursorItemReader<OrderRecord> partitionedReader(
DataSource dataSource,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
return new JdbcCursorItemReaderBuilder<OrderRecord>()
.name("partitionedOrderReader")
.dataSource(dataSource)
.sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
.preparedStatementSetter(ps -> {
ps.setLong(1, minId);
ps.setLong(2, maxId);
})
.rowMapper(new OrderRecordRowMapper())
.build();
}
}Sẵn sàng chinh phục phỏng vấn Spring Boot?
Luyện tập với mô phỏng tương tác, flashcards và bài kiểm tra kỹ thuật.
Khả năng chịu lỗi và phục hồi sau lỗi
Câu hỏi 7: Spring Batch cung cấp những cơ chế chịu lỗi nào?
Spring Batch cung cấp ba cơ chế bổ trợ: skip (bỏ qua phần tử lỗi), retry (tự động thử lại) và restart (tiếp tục một job thất bại). Các cơ chế này cấu hình ở cấp step.
// Cấu hình chịu lỗi đầy đủ
@Configuration
public class FaultTolerantStepConfig {
@Bean
public Step faultTolerantStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<DataRecord> reader,
ItemProcessor<DataRecord, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
SkipPolicy customSkipPolicy) {
return new StepBuilder("faultTolerantStep", jobRepository)
.<DataRecord, ProcessedRecord>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// Bật chế độ chịu lỗi
.faultTolerant()
// SKIP: bỏ qua tối đa 10 lỗi xác thực
.skipLimit(10)
.skip(ValidationException.class)
.skip(DataIntegrityViolationException.class)
// Một số lỗi không bao giờ được bỏ qua
.noSkip(FatalBatchException.class)
// RETRY: thử lại với lỗi tạm thời
.retryLimit(3)
.retry(TransientDataAccessException.class)
.retry(DeadlockLoserDataAccessException.class)
// Backoff theo cấp số nhân giữa các lần thử
.backOffPolicy(exponentialBackOffPolicy())
// Listener ghi log skip
.listener(skipListener())
.build();
}
@Bean
public BackOffPolicy exponentialBackOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(1000); // 1 giây
policy.setMultiplier(2.0); // Gấp đôi mỗi lần thử
policy.setMaxInterval(10000); // Tối đa 10 giây
return policy;
}
@Bean
public SkipListener<DataRecord, ProcessedRecord> skipListener() {
return new SkipListener<>() {
@Override
public void onSkipInRead(Throwable t) {
// Ghi log phần tử không đọc được
}
@Override
public void onSkipInProcess(DataRecord item, Throwable t) {
// Ghi log phần tử lỗi khi xử lý
}
@Override
public void onSkipInWrite(ProcessedRecord item, Throwable t) {
// Ghi log phần tử lỗi khi ghi
}
};
}
}Retry phù hợp với lỗi tạm thời (timeout mạng, deadlock cơ sở dữ liệu). Skip phù hợp với lỗi dữ liệu cá nhân không nên cản trở toàn bộ tiến trình.
Câu hỏi 8: Cách triển khai SkipPolicy tùy biến?
SkipPolicy tùy biến cho phép logic quyết định tinh vi: bỏ qua theo loại exception, số lần lỗi hoặc tiêu chí nghiệp vụ cụ thể.
// SkipPolicy với logic nghiệp vụ nâng cao
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {
private static final int MAX_SKIP_COUNT = 100;
private static final double MAX_SKIP_PERCENTAGE = 0.05; // Tối đa 5%
private final AtomicInteger totalProcessed = new AtomicInteger(0);
private final AtomicInteger skipCount = new AtomicInteger(0);
@Override
public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
// Không bao giờ bỏ qua lỗi nghiêm trọng
if (exception instanceof FatalBatchException
|| exception instanceof OutOfMemoryError) {
return false;
}
// Giới hạn skip tuyệt đối
if (skipCountSoFar >= MAX_SKIP_COUNT) {
return false; // Dừng job
}
// Giới hạn theo phần trăm
int total = totalProcessed.get();
if (total > 1000) { // Chỉ áp dụng sau khi khởi động
double skipPercentage = (double) skipCountSoFar / total;
if (skipPercentage > MAX_SKIP_PERCENTAGE) {
return false; // Tỷ lệ lỗi quá cao
}
}
// Bỏ qua lỗi xác thực và dữ liệu
return exception instanceof ValidationException
|| exception instanceof DataFormatException
|| exception instanceof IllegalArgumentException;
}
// Được listener gọi để theo dõi tiến độ
public void incrementProcessed() {
totalProcessed.incrementAndGet();
}
}Câu hỏi 9: Restart một job thất bại hoạt động ra sao?
JobRepository lưu trạng thái mỗi lần thực thi. Khi restart, Spring Batch xác định chunk được commit gần nhất và tiếp tục từ đó. Các phần tử đã xử lý thành công không bị xử lý lại.
// Dịch vụ quản lý restart job
@Service
public class JobRestartService {
private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
private final Job importJob;
public JobRestartService(JobLauncher jobLauncher,
JobExplorer jobExplorer,
JobRepository jobRepository,
@Qualifier("importJob") Job importJob) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
this.importJob = importJob;
}
public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
// Lấy lượt thực thi đã thất bại
JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);
if (failedExecution == null) {
throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
}
// Kiểm tra job có thể restart
if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
throw new IllegalStateException("Only FAILED jobs can be restarted");
}
// Dùng đúng tham số như lần thực thi gốc
JobParameters originalParams = failedExecution.getJobParameters();
// Khởi chạy lại job - tự động tiếp tục từ checkpoint cuối
return jobLauncher.run(importJob, originalParams);
}
public List<JobExecution> findRestartableJobs() {
// Liệt kê các lần thực thi FAILED chưa được restart
return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.FAILED)
.filter(this::isRestartable)
.toList();
}
private boolean isRestartable(JobExecution execution) {
// Đảm bảo không có lượt thực thi thành công mới hơn
JobInstance instance = execution.getJobInstance();
return jobExplorer.getJobExecutions(instance).stream()
.noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
}
}Một job chỉ restart được khi JobParameters giống hệt nhau. Thay đổi tham số sẽ tạo instance job mới và mất lịch sử tiến độ.
Mở rộng và tối ưu
Câu hỏi 10: Có những chiến lược mở rộng nào?
Spring Batch cung cấp bốn chiến lược: multi-threaded step (nhiều luồng đọc song song), parallel steps (step độc lập song song), remote chunking (xử lý phân tán) và partitioning (dữ liệu phân tán).
// Step đa luồng: nhiều luồng xử lý cùng dataset
@Configuration
public class MultiThreadedStepConfig {
@Bean
public Step multiThreadedStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<Record> reader,
ItemProcessor<Record, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
TaskExecutor taskExecutor) {
return new StepBuilder("multiThreadedStep", jobRepository)
.<Record, ProcessedRecord>chunk(100, transactionManager)
// CHÚ Ý: reader phải thread-safe
.reader(synchronizedReader(reader))
.processor(processor)
.writer(writer)
// 4 luồng xử lý chunk song song
.taskExecutor(taskExecutor)
.throttleLimit(4)
.build();
}
// Wrapper để biến reader thành thread-safe
private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
syncReader.setDelegate((ItemStreamReader<Record>) reader);
return syncReader;
}
}// Thực thi các step độc lập song song
@Configuration
public class ParallelStepsConfig {
@Bean
public Job parallelJob(JobRepository jobRepository,
Step loadCustomersStep,
Step loadProductsStep,
Step loadOrdersStep,
Step processDataStep) {
// Luồng song song: customers và products tải đồng thời
Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
.start(loadCustomersStep)
.build();
Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
.start(loadProductsStep)
.build();
Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
.start(loadOrdersStep)
.build();
// Split chạy các flow song song
return new JobBuilder("parallelJob", jobRepository)
.start(new FlowBuilder<Flow>("parallelLoadFlow")
.split(new SimpleAsyncTaskExecutor())
.add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
.build())
// Sau khi tải song song, xử lý tuần tự
.next(processDataStep)
.build()
.build();
}
}Multi-threading phù hợp khi reader có thể được đồng bộ. Phân vùng được ưu tiên cho khối lượng lớn vì mỗi phân vùng có reader riêng, không tranh chấp.
Câu hỏi 11: Làm sao giám sát hiệu năng job?
Spring Batch cung cấp các metric qua listener và JobRepository. Tích hợp với Micrometer cho phép xuất sang Prometheus, Grafana hoặc các hệ thống giám sát khác.
// Cấu hình giám sát với Micrometer
@Configuration
public class BatchMetricsConfig {
private final MeterRegistry meterRegistry;
public BatchMetricsConfig(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Bean
public JobExecutionListener metricsJobListener() {
return new JobExecutionListener() {
private Timer.Sample jobTimer;
@Override
public void beforeJob(JobExecution jobExecution) {
// Bắt đầu đo thời gian job
jobTimer = Timer.start(meterRegistry);
Counter.builder("batch.job.started")
.tag("job", jobExecution.getJobInstance().getJobName())
.register(meterRegistry)
.increment();
}
@Override
public void afterJob(JobExecution jobExecution) {
// Ghi nhận tổng thời gian
jobTimer.stop(Timer.builder("batch.job.duration")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry));
// Bộ đếm job theo trạng thái
Counter.builder("batch.job.completed")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry)
.increment();
}
};
}
@Bean
public StepExecutionListener metricsStepListener() {
return new StepExecutionListener() {
@Override
public void afterStep(StepExecution stepExecution) {
String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
String stepName = stepExecution.getStepName();
// Các chỉ số throughput
Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
return null;
}
};
}
}Câu hỏi 12: Những bẫy thường gặp khi phân vùng?
Những sai lầm phổ biến: phân vùng không cân bằng (một phân vùng giữ 90% dữ liệu), reader không thread-safe và quản lý trạng thái sai giữa các phân vùng.
// Partitioner thực sự cân bằng tải
@Component
public class BalancedPartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Đếm tổng phần tử cần xử lý
Integer totalCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);
if (totalCount == null || totalCount == 0) {
return Map.of();
}
// Tính kích thước mục tiêu cho mỗi phân vùng
int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);
Map<String, ExecutionContext> partitions = new HashMap<>();
// Dùng OFFSET/LIMIT cho phân vùng cân bằng
// Tốn kém hơn ranges nhưng đảm bảo cân bằng
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("offset", i * itemsPerPartition);
context.putInt("limit", itemsPerPartition);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}
// OffsetBasedReader.java
// Reader tương thích với phân vùng dựa trên offset
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {
private final JdbcTemplate jdbcTemplate;
private Iterator<OrderRecord> iterator;
@Value("#{stepExecutionContext['offset']}")
private int offset;
@Value("#{stepExecutionContext['limit']}")
private int limit;
public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void open(ExecutionContext executionContext) {
// Tải đúng phần đã giao cho phân vùng này
List<OrderRecord> records = jdbcTemplate.query(
"SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
new OrderRecordRowMapper(),
limit, offset
);
this.iterator = records.iterator();
}
@Override
public OrderRecord read() {
return iterator.hasNext() ? iterator.next() : null;
}
@Override
public void update(ExecutionContext executionContext) {
// Lưu trạng thái cho restart nếu cần
}
@Override
public void close() {
// Dọn dẹp
}
}Câu hỏi nâng cao cho cấp senior
Câu hỏi 13: Quản lý phụ thuộc giữa các job ra sao?
Spring Batch không quản lý phụ thuộc giữa các job theo mặc định. Giải pháp: orchestrator bên ngoài (Airflow, Kubernetes CronJob) hoặc triển khai tùy biến với JobExplorer.
// Quản lý phụ thuộc giữa các job
@Service
public class JobDependencyService {
private final JobExplorer jobExplorer;
private final JobLauncher jobLauncher;
private final Map<String, Job> jobs;
public JobDependencyService(JobExplorer jobExplorer,
JobLauncher jobLauncher,
Map<String, Job> jobs) {
this.jobExplorer = jobExplorer;
this.jobLauncher = jobLauncher;
this.jobs = jobs;
}
public JobExecution runWithDependencies(String jobName,
JobParameters params,
List<String> dependsOn) throws Exception {
// Kiểm tra mọi phụ thuộc đã thành công
for (String dependency : dependsOn) {
if (!hasSuccessfulExecution(dependency, params)) {
throw new JobExecutionException(
"Dependency not satisfied: " + dependency);
}
}
Job job = jobs.get(jobName);
if (job == null) {
throw new IllegalArgumentException("Unknown job: " + jobName);
}
return jobLauncher.run(job, params);
}
private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
// Tìm lượt thực thi COMPLETED với cùng tham số nghiệp vụ
return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
.anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
}
private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
// So sánh tham số nghiệp vụ (bỏ qua timestamp thực thi)
String actualDate = actual.getString("businessDate");
String expectedDate = expected.getString("businessDate");
return Objects.equals(actualDate, expectedDate);
}
}Câu hỏi 14: Kiểm thử job Spring Batch hiệu quả ra sao?
Kiểm thử job Spring Batch cần cách tiếp cận theo lớp: unit test cho thành phần (reader, processor, writer), integration test cho step, và test end-to-end cho job đầy đủ.
// Unit test processor
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {
@Mock
private PricingService pricingService;
@Mock
private ValidationService validationService;
@InjectMocks
private OrderItemProcessor processor;
@Test
void shouldProcessValidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(true);
when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));
// When
ProcessedOrder result = processor.process(input);
// Then
assertThat(result).isNotNull();
assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
}
@Test
void shouldFilterInvalidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(false);
// When
ProcessedOrder result = processor.process(input);
// Then - null nghĩa là đã lọc
assertThat(result).isNull();
verify(pricingService, never()).calculatePrice(any());
}
}// Integration test cho job đầy đủ
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JobRepositoryTestUtils jobRepositoryTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@BeforeEach
void setup() {
// Dọn metadata giữa các test
jobRepositoryTestUtils.removeJobExecutions();
// Reset dữ liệu test
jdbcTemplate.execute("DELETE FROM processed_orders");
jdbcTemplate.execute("DELETE FROM orders");
}
@Test
void shouldCompleteJobSuccessfully() throws Exception {
// Given - dữ liệu test
insertTestOrders(100);
// When
JobParameters params = new JobParametersBuilder()
.addLocalDate("businessDate", LocalDate.now())
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncherTestUtils.launchJob(params);
// Then
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
assertThat(countProcessedOrders()).isEqualTo(100);
}
@Test
void shouldHandleEmptyDataset() throws Exception {
// Given - không dữ liệu
// When
JobExecution execution = jobLauncherTestUtils.launchJob();
// Then - job vẫn thành công dù không có dữ liệu
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
@Test
void shouldRestartFromFailurePoint() throws Exception {
// Given - mô phỏng lỗi giữa quá trình xử lý
insertTestOrders(100);
insertPoisonOrder(50); // Gây lỗi
// When - lần thực thi đầu thất bại
JobExecution firstExecution = jobLauncherTestUtils.launchJob();
assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);
// Sửa dữ liệu
removePoisonOrder(50);
// When - restart
JobExecution restartExecution = jobLauncherTestUtils.launchJob(
firstExecution.getJobParameters());
// Then - tiếp tục từ điểm lỗi
assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
private void insertTestOrders(int count) {
for (int i = 1; i <= count; i++) {
jdbcTemplate.update(
"INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
i, i * 10, BigDecimal.valueOf(i * 10));
}
}
private int countProcessedOrders() {
return jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM processed_orders", Integer.class);
}
}Câu hỏi 15: Tối ưu hiệu năng ghi vào cơ sở dữ liệu ra sao?
Ghi thường trở thành nút thắt. Các tối ưu gồm: JDBC batch insert, vô hiệu hóa constraint trong khi tải và sử dụng bảng staging.
// Writer tối ưu cho khối lượng lớn
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
private final DataSource dataSource;
public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
this.jdbcTemplate = jdbcTemplate;
this.dataSource = dataSource;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
List<? extends ProcessedOrder> items = chunk.getItems();
if (items.isEmpty()) {
return;
}
// Sử dụng PreparedStatement với batch
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
"VALUES (?, ?, ?, ?)")) {
for (ProcessedOrder order : items) {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
ps.addBatch();
}
// Thực thi tất cả insert trong một thao tác mạng
ps.executeBatch();
}
}
}
// StagingTableWriter.java
// Mẫu staging table cho khối lượng rất lớn
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {
private final JdbcTemplate jdbcTemplate;
private String stagingTable;
public StagingTableWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void beforeStep(StepExecution stepExecution) {
// Tạo bảng tạm cho step này
stagingTable = "staging_orders_" + stepExecution.getId();
jdbcTemplate.execute(
"CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// Ghi vào bảng staging (không có FK constraint)
String sql = "INSERT INTO " + stagingTable +
" (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";
jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
});
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
// Bulk copy sang bảng đích
jdbcTemplate.execute(
"INSERT INTO processed_orders SELECT * FROM " + stagingTable);
}
// Dọn bảng staging
jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
return stepExecution.getExitStatus();
}
}Kết luận
Thành thạo Spring Batch 5 trong các buổi phỏng vấn dựa trên hiểu biết sâu về cơ chế nội tại:
✅ Kiến trúc: Job → Step → Chunk (Reader, Processor, Writer)
✅ Xử lý chunk: kích thước, vòng đời, giao dịch
✅ Phân vùng: cục bộ và từ xa, cân bằng phân vùng
✅ Khả năng chịu lỗi: skip, retry, restart với chính sách phù hợp
✅ Mở rộng: multi-threading, parallel steps, remote chunking
✅ Kiểm thử: unit, integration, end-to-end
✅ Tối ưu: batch writes, staging table, giám sát
Các câu hỏi nâng cao đánh giá khả năng biện luận cho lựa chọn kiến trúc theo bối cảnh: khối lượng dữ liệu, ràng buộc thời gian, mức chấp nhận lỗi và hạ tầng hiện có.
Bắt đầu luyện tập!
Kiểm tra kiến thức với mô phỏng phỏng vấn và bài kiểm tra kỹ thuật.
Thẻ
Chia sẻ
Bài viết liên quan

Spring Modulith: Kiến trúc Monolith Mô-đun Giải thích
Học Spring Modulith để xây dựng monolith mô-đun trong Java. Kiến trúc, mô-đun, sự kiện bất đồng bộ và testing với ví dụ Spring Boot 3.

Phỏng vấn Spring Boot: Lan truyền Giao dịch
Làm chủ lan truyền giao dịch Spring Boot: REQUIRED, REQUIRES_NEW, NESTED và hơn thế. 12 câu hỏi phỏng vấn với mã ví dụ và bẫy thường gặp.

Spring Security 6: Xác Thực JWT Toàn Diện
Hướng dẫn thực tế triển khai xác thực JWT với Spring Security 6: cấu hình, sinh token, kiểm tra hợp lệ và các thực hành bảo mật tốt nhất.