Spring Batch 5 ๋ฉด์ : ํํฐ์ ๋, ์ฒญํฌ, ์ฅ์ ํ์ฉ
Spring Batch 5 ๋ฉด์ ์ ์ ๋ณตํ์ธ์. ํํฐ์ ๋, ์ฒญํฌ ์ฒ๋ฆฌ, ์ฅ์ ํ์ฉ์ ๊ดํ 15๊ฐ์ง ํต์ฌ ์ง๋ฌธ๊ณผ Java 21 ์์ ๋ฅผ ์ ๊ณตํฉ๋๋ค.

Spring Batch 5๋ Spring ์ํ๊ณ์ ๋๊ท๋ชจ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ํต์ฌ์ ๋๋ค. ๊ธฐ์ ๋ฉด์ ์์๋ ๊ฒฌ๊ณ ํ๊ณ ํ์ฅ ๊ฐ๋ฅํ๋ฉฐ ์ฅ์ ์ ๊ฐํ ์ก์ ์ค๊ณํ๋ ๋ฅ๋ ฅ์ ํ๊ฐํฉ๋๋ค. ํํฐ์ ๋, ์ฒญํฌ ์งํฅ ์ฒ๋ฆฌ, ์ฅ์ ํ์ฉ ๋ฉ์ปค๋์ฆ์ ๋ํ ์๋ จ๋๊ฐ ์๋์ด ๊ฐ๋ฐ์๋ฅผ ๋๋๋ฌ์ง๊ฒ ํฉ๋๋ค.
์ฑ์ฉ ๋ด๋น์๋ ๊น์ ์ดํด๋ฅผ ์ํํฉ๋๋ค. ์ remote chunking ๋์ partitioning์ ์ ํํด์ผ ํฉ๋๊น? ์ฒญํฌ ํฌ๊ธฐ๋ฅผ ์ด๋ป๊ฒ ์ฐ์ ํด์ผ ํฉ๋๊น? ์ด๋ฌํ ์ํคํ ์ฒ ๊ฒฐ์ ์ ์ค์ ํ๋ก๋์ ๊ฒฝํ์ ๋๋ฌ๋ ๋๋ค.
Spring Batch 5์ ํต์ฌ ์ํคํ ์ฒ
์ง๋ฌธ 1: Spring Batch์ ์ฃผ์ ๊ตฌ์ฑ ์์๋ ๋ฌด์์ ๋๊น
Spring Batch ์ํคํ ์ฒ๋ ์ธ ๊ณ์ธต์ผ๋ก ๊ตฌ์ฑ๋ฉ๋๋ค. ์ ํ๋ฆฌ์ผ์ด์ (์ก๊ณผ ๋น์ฆ๋์ค ์ฝ๋), Batch Core(์ก์ ์คํํ๊ณ ์ ์ดํ๋ ๋ฐํ์ ํด๋์ค), ์ธํ๋ผ(reader, writer, RetryTemplate ๊ฐ์ ๊ณต์ฉ ์๋น์ค)์ ๋๋ค.
// Java 21 ๊ธฐ๋ฐ Spring Batch 5 ์ก ์ค์
@Configuration
public class BatchJobConfig {
// JobRepository๋ ์คํ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํฉ๋๋ค
// ์ก ์ฌ์์๊ณผ ๋ชจ๋ํฐ๋ง์ ๊ฐ๋ฅํ๊ฒ ํฉ๋๋ค
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public BatchJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// Job์ ์ ์ฒด ๋ฐฐ์น ํ๋ก์ธ์ค๋ฅผ ์บก์ํํฉ๋๋ค
// ์์ฐจ์ ์ผ๋ก ์คํ๋๋ ํ๋ ์ด์์ Step์ผ๋ก ๊ตฌ์ฑ๋ฉ๋๋ค
@Bean
public Job importUserJob(Step processUsersStep, Step cleanupStep) {
return new JobBuilder("importUserJob", jobRepository)
.start(processUsersStep) // ๋ฉ์ธ ์ฒ๋ฆฌ Step
.next(cleanupStep) // ์ ๋ฆฌ Step
.build();
}
// Step์ ๋
๋ฆฝ๋ ์์
๋จ์๋ฅผ ๋ํ๋
๋๋ค
// ๋ ๊ฐ์ง ๋ชจ๋ธ: Tasklet(๋จ์ผ ์์
) ๋๋ Chunk(๋ฐ๋ณต ์ฒ๋ฆฌ)
@Bean
public Step processUsersStep(ItemReader<UserRecord> reader,
ItemProcessor<UserRecord, User> processor,
ItemWriter<User> writer) {
return new StepBuilder("processUsersStep", jobRepository)
.<UserRecord, User>chunk(100, transactionManager) // 100๊ฑด๋ง๋ค ์ปค๋ฐ
.reader(reader) // ์๋ณธ ๋ฐ์ดํฐ๋ฅผ ์ฝ์
.processor(processor) // ๊ฐ ์์ดํ
์ ๋ณํ
.writer(writer) // 100๊ฑด ๋จ์๋ก ๊ธฐ๋ก
.build();
}
}JobRepository๋ ์คํ ์ํ๋ฅผ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์์ํํฉ๋๋ค. ์ด๋ฌํ ์์์ฑ ๋๋ถ์ ์คํจํ ์ก์ ์ ํํ ๋ฉ์ถ ์ง์ ๋ถํฐ ์ฌ์์ํ ์ ์๊ณ , ์ด๋ฏธ ์ปค๋ฐ๋ ๋ฐ์ดํฐ๋ฅผ ๋ค์ ์ฒ๋ฆฌํ ํ์๊ฐ ์์ต๋๋ค.
์ง๋ฌธ 2: Tasklet๊ณผ ์ฒญํฌ ์งํฅ ์ฒ๋ฆฌ์ ์ฐจ์ด๋ ๋ฌด์์ ๋๊น
Tasklet์ ๋ฐ๋ณตํ์ง ์๋ ๋จ์ผ ๋์์ ์ํํฉ๋๋ค. ํ์ผ ์ญ์ , ์ ์ฅ ํ๋ก์์ ํธ์ถ, ์๋ฆผ ๋ฉ์ผ ๋ฐ์ก ๋ฑ์ด ๊ทธ ์์ ๋๋ค. Chunk๋ ๋ฐ์ดํฐ๋ฅผ ๊ด๋ฆฌ ๊ฐ๋ฅํ ๋ฐฐ์น๋ก ๋๋์ด ๋์ฉ๋์ ์ฒ๋ฆฌํฉ๋๋ค.
// Tasklet: ๋ฐ๋ณต ์๋ ๋จ์ผ ๋์
@Component
public class CleanupTasklet implements Tasklet {
private final Path tempDirectory = Path.of("/tmp/batch-work");
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// ์ฒ๋ฆฌ ๊ณผ์ ์์ ์๊ธด ์์ ํ์ผ์ ๋ชจ๋ ์ญ์
try (var files = Files.walk(tempDirectory)) {
files.filter(Files::isRegularFile)
.forEach(this::deleteQuietly);
}
// FINISHED๋ tasklet์ด ์์
์ ์๋ฃํ์์ ์๋ฏธ
// CONTINUABLE์ ์คํ์ ๋ค์ ์์(ํด๋ง์ ์ ์ฉ)
return RepeatStatus.FINISHED;
}
private void deleteQuietly(Path file) {
try {
Files.delete(file);
} catch (IOException e) {
// ๋ก๊ทธ๋ง ๋จ๊ธฐ๊ณ ๊ณ์ - ํ์ผ ํ๋๋ก ์ก์ ์คํจ์ํค์ง ์์
}
}
}// ์ฒญํฌ ์ฒ๋ฆฌ: ๋์ฉ๋ ์ฒ๋ฆฌ
@Configuration
public class ChunkProcessingConfig {
@Bean
public Step processOrdersStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("processOrdersStep", jobRepository)
// 500๊ฐ ์ฒญํฌ: 500๊ฑด ์ฝ๊ธฐ, ์ฒ๋ฆฌ, ๊ธฐ๋ก ํ ์ปค๋ฐ
.<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// ์งํ ์ํฉ ๋ชจ๋ํฐ๋ง์ฉ ๋ฆฌ์ค๋
.listener(new ChunkProgressListener())
.build();
}
}์ฒญํฌ ์งํฅ ์ฒ๋ฆฌ๋ ์ค์ํ ์ด์ ์ ์ ๊ณตํฉ๋๋ค. ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ ์ต์ ํ(ํ์ฌ ์ฒญํฌ๋ง ๋ฉ๋ชจ๋ฆฌ์ ๋ณด๊ด), ์ธ๋ถํ๋ ํธ๋์ญ์ (์ฒญํฌ ๋จ์ ์ปค๋ฐ), ๋ง์ง๋ง ์ปค๋ฐ๋ ์ฒญํฌ์์์ ์ฅ์ ๋ณต๊ตฌ์ ๋๋ค.
์ฒญํฌ ์งํฅ ์ฒ๋ฆฌ ์ฌํ
์ง๋ฌธ 3: ์ฒญํฌ ๋ผ์ดํ์ฌ์ดํด์ ์ด๋ป๊ฒ ๋์ํฉ๋๊น
๊ฐ ์ฒญํฌ๋ ์ ํํ ์ฌ์ดํด์ ๋ฐ๋ฆ ๋๋ค. ์ค์ ํ ํฌ๊ธฐ์ ๋๋ฌํ ๋๊น์ง ์์ดํ ์ ํ ๊ฑด์ฉ ์ฝ๊ณ , ๊ฐ ์์ดํ ์ ๊ฐ๋ณ์ ์ผ๋ก ์ฒ๋ฆฌํ ๋ค, ๊ทธ๋ฃน์ ํ ๋ฒ์ ๊ธฐ๋กํฉ๋๋ค. ํธ๋์ญ์ ์ด ์ฒญํฌ ์ ์ฒด๋ฅผ ๊ฐ์๋๋ค.
// ItemReader: ํ ๋ฒ์ ํ ์์ดํ
์ฉ ์ฝ์
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {
// @StepScope: Step ์คํ๋ง๋ค ์ ์ธ์คํด์ค
// ๋์ ์ธ ์ก ํ๋ผ๋ฏธํฐ ์ฃผ์
์ ๊ฐ๋ฅํ๊ฒ ํจ
@Value("#{jobParameters['startDate']}")
private LocalDate startDate;
private Iterator<OrderRecord> orderIterator;
@BeforeStep
public void initializeReader(StepExecution stepExecution) {
// Step ์์ ์ ๋ฐ์ดํฐ ๋ก๋ฉ
List<OrderRecord> orders = fetchOrdersFromDate(startDate);
this.orderIterator = orders.iterator();
}
@Override
public OrderRecord read() {
// ๋ฐ์ดํฐ ๋์ ์๋ฆฌ๊ธฐ ์ํด null ๋ฐํ
// Spring Batch๋ null์ ๋ฐ์ ๋๊น์ง read()๋ฅผ ํธ์ถ
if (orderIterator.hasNext()) {
return orderIterator.next();
}
return null; // ๋ฐ์ดํฐ์
์ ๋
}
private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
// ๋ฐ์ดํฐ ์์ค์์ ์กฐํ
return List.of(); // ์ค์ ๊ตฌํ
}
}// ItemProcessor: ๊ฐ ์์ดํ
์ ๊ฐ๋ณ์ ์ผ๋ก ๋ณํ
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {
private final PricingService pricingService;
private final ValidationService validationService;
public OrderItemProcessor(PricingService pricingService,
ValidationService validationService) {
this.pricingService = pricingService;
this.validationService = validationService;
}
@Override
public ProcessedOrder process(OrderRecord item) {
// null ๋ฐํ์ ์์ดํ
์ ํํฐ๋ง(๊ธฐ๋ก๋์ง ์์)
if (!validationService.isValid(item)) {
return null; // ์์ดํ
ํํฐ๋ง
}
// ๋น์ฆ๋์ค ๋ณํ
BigDecimal finalPrice = pricingService.calculatePrice(item);
return new ProcessedOrder(
item.orderId(),
item.customerId(),
finalPrice,
LocalDateTime.now()
);
}
}// ItemWriter: ์ฒญํฌ ์ ์ฒด๋ฅผ ํ ๋ฒ์ ๊ธฐ๋ก
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
public OrderItemWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// ์ฒญํฌ๋ ์ฒ๋ฆฌ๋ ๋ชจ๋ ์์ดํ
์ ํฌํจ
// ์ฑ๋ฅ ์ต์ ํ๋ฅผ ์ํ ๋ฐฐ์น ๊ธฐ๋ก
List<? extends ProcessedOrder> items = chunk.getItems();
jdbcTemplate.batchUpdate(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
items,
items.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
}
);
}
}์ฒญํฌ ์ฒ๋ฆฌ ์ค ์์ธ๊ฐ ๋ฐ์ํ๋ฉด ํธ๋์ญ์ ์ ๋กค๋ฐฑ๋ฉ๋๋ค. ์ดํ ์ก์ JobRepository์ ์ ์ฅ๋ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ์ฌ์ฉํด ํด๋น ์ฒญํฌ๋ถํฐ ์ฌ๊ฐํ ์ ์์ต๋๋ค.
์ง๋ฌธ 4: ์ต์ ์ ์ฒญํฌ ํฌ๊ธฐ๋ ์ด๋ป๊ฒ ์ ํํฉ๋๊น
์ฒญํฌ ํฌ๊ธฐ๋ ์ฑ๋ฅ๊ณผ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ์ ์ง์ ์ํฅ์ ์ค๋๋ค. ๋๋ฌด ์์ผ๋ฉด ์ปค๋ฐ์ด ๋ง์์ ธ ์ค๋ฒํค๋๊ฐ ์ปค์ง๊ณ , ๋๋ฌด ํฌ๋ฉด ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๊ณผ๋ํ๊ฒ ์ฌ์ฉํ๋ฉฐ ์ฅ์ ์ ๋กค๋ฐฑ ์๊ฐ์ด ๊ธธ์ด์ง๋๋ค.
// ์ฒญํฌ ํฌ๊ธฐ์ ๋์ ์ค์
@Configuration
public class ChunkSizingConfig {
// ๋๋ถ๋ถ ์ํฉ์์ ํฉ๋ฆฌ์ ์ธ ๊ธฐ๋ณธ๊ฐ
private static final int DEFAULT_CHUNK_SIZE = 100;
// ๊ฐ๋ฒผ์ด ์์ดํ
(ํ๋๊ฐ ์ ์ ๊ฒฝ์ฐ)
private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;
// ๋ฌด๊ฑฐ์ด ์์ดํ
(blob, ๋ฌธ์)
private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;
@Bean
public Step processLightDataStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<LightRecord> reader,
ItemWriter<LightRecord> writer) {
return new StepBuilder("processLightDataStep", jobRepository)
// ๊ฐ๋ฒผ์ด ์์ดํ
: ์ปค๋ฐ ํ์๋ฅผ ์ค์ด๊ธฐ ์ํด ๋ ํฐ ์ฒญํฌ
.<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Step processDocumentsStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<Document> reader,
ItemProcessor<Document, ProcessedDocument> processor,
ItemWriter<ProcessedDocument> writer) {
return new StepBuilder("processDocumentsStep", jobRepository)
// ๋ฌด๊ฑฐ์ด ๋ฌธ์: ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ์ ์ ํํ๊ธฐ ์ํด ์์ ์ฒญํฌ
.<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}์ฒญํฌ๋น 100๊ฑด์ผ๋ก ์์ํ ๋ค ์งํ(์ปค๋ฐ ์๊ฐ, ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋, ๋กค๋ฐฑ ์๊ฐ)์ ๋ฐ๋ผ ์กฐ์ ํฉ๋๋ค. ๋ฆฌ์ค๋๋ก ๋ชจ๋ํฐ๋งํด ์ต์ ์ ์ ์ฐพ์ผ์ญ์์ค.
๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ์ํ ํํฐ์ ๋
์ง๋ฌธ 5: ํํฐ์ ๋์ด๋ ๋ฌด์์ด๋ฉฐ ์ธ์ ์ฌ์ฉํฉ๋๊น
ํํฐ์ ๋์ ๋ฐ์ดํฐ์ ์ ๋ ๋ฆฝ์ ์ธ ํํฐ์ ์ผ๋ก ๋๋์ด ๋ณ๋ ฌ๋ก ์ฒ๋ฆฌํฉ๋๋ค. ๊ฐ ํํฐ์ ์ ์์ฒด ์ค๋ ๋(๋ก์ปฌ) ๋๋ ์๊ฒฉ worker์์ ์คํ๋ฉ๋๋ค. ์ด ๋ฐฉ์์ ์ฌ์์ ๋ฅ๋ ฅ์ ์์ง ์์ผ๋ฉด์ ์ฒ๋ฆฌ๋์ ํฌ๊ฒ ๋๋ฆฝ๋๋ค.
// ํํฐ์
๋๋ ์ก ์ค์
@Configuration
public class PartitionedJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public PartitionedJobConfig(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
@Bean
public Job partitionedImportJob(Step partitionedStep) {
return new JobBuilder("partitionedImportJob", jobRepository)
.start(partitionedStep)
.build();
}
// ๋งค๋์ Step: ํํฐ์
์ ์กฐ์จ
@Bean
public Step partitionedStep(Partitioner partitioner,
Step workerStep,
TaskExecutor taskExecutor) {
return new StepBuilder("partitionedStep", jobRepository)
// Partitioner๋ฅผ ํตํด ์์
๋ถํ
.partitioner("workerStep", partitioner)
// ๊ฐ ํํฐ์
๋ง๋ค ์คํํ Step
.step(workerStep)
// 8๊ฐ ๋ณ๋ ฌ ์ค๋ ๋
.taskExecutor(taskExecutor)
// ์์ฑํ ํํฐ์
์
.gridSize(8)
.build();
}
// ๋ณ๋ ฌ ์คํ์ ์ํ TaskExecutor
@Bean
public TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("batch-partition-");
executor.initialize();
return executor;
}
}// ID ๋ฒ์ ๊ธฐ๋ฐ Partitioner
@Component
public class RangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public RangePartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// ๋ฐ์ดํฐ์
์ ๊ฒฝ๊ณ๋ฅผ ์กฐํ
Long minId = jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
Long maxId = jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);
if (minId == null || maxId == null) {
return Map.of(); // ์ฒ๋ฆฌํ ๋ฐ์ดํฐ ์์
}
// ๊ฐ ํํฐ์
์ ํฌ๊ธฐ๋ฅผ ๊ณ์ฐ
long range = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
long start = minId + (i * range);
long end = Math.min(start + range - 1, maxId);
// ๊ฐ ํํฐ์
์ด ์์ ์ ๊ฒฝ๊ณ๋ฅผ ๋ฐ์
context.putLong("minId", start);
context.putLong("maxId", end);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}ํํฐ์ ๋์ ์์ดํ ์ด ์๋ก ๋ ๋ฆฝ์ ์ธ ํฐ ๋ฐ์ดํฐ์ ์ ์ ํฉํฉ๋๋ค. ๋๋ฆฐ ํํฐ์ ์ด ์ ์ฒด ์ก์ ์ง์ฐ์ํค์ง ์๋๋ก ๊ท ํ์ด ํ์ํฉ๋๋ค.
์ง๋ฌธ 6: ๋ก์ปฌ ํํฐ์ ๋๊ณผ ์๊ฒฉ ํํฐ์ ๋์ ์ฐจ์ด๋ ๋ฌด์์ ๋๊น
๋ก์ปฌ ํํฐ์ ๋์ ๋์ผ JVM์ ์ค๋ ๋ ํ์์ ๋ชจ๋ ํํฐ์ ์ ์คํํฉ๋๋ค. ์๊ฒฉ ํํฐ์ ๋์ ๋ฉ์์ง ๋ฏธ๋ค์จ์ด๋ฅผ ํตํด ์ฌ๋ฌ JVM(worker)์ผ๋ก ํํฐ์ ์ ๋ถ์ฐํฉ๋๋ค.
// ๋ฉ์์ง์ ํ์ฉํ ์๊ฒฉ ํํฐ์
๋ ์ค์
@Configuration
public class RemotePartitioningConfig {
@Bean
public Step managerStep(JobRepository jobRepository,
Partitioner partitioner,
MessageChannelPartitionHandler partitionHandler) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", partitioner)
// ์๊ฒฉ worker์ ํต์ ํ๋ ํธ๋ค๋ฌ
.partitionHandler(partitionHandler)
.build();
}
// PartitionHandler๊ฐ ExecutionContext๋ฅผ worker๋ก ์ ์ก
@Bean
public MessageChannelPartitionHandler partitionHandler(
MessagingTemplate messagingTemplate,
JobExplorer jobExplorer) {
MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
handler.setStepName("workerStep");
handler.setGridSize(4);
handler.setMessagingOperations(messagingTemplate);
handler.setJobExplorer(jobExplorer);
// worker ์๋ฃ ๋๊ธฐ ํ์์์
handler.setPollInterval(5000L);
return handler;
}
}// worker ์ธก ์ค์
@Configuration
public class WorkerConfiguration {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public WorkerConfiguration(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
// worker๋ ํํฐ์
์ ๋ฐ์ Step์ ์คํ
@Bean
public Step workerStep(ItemReader<OrderRecord> reader,
ItemProcessor<OrderRecord, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("workerStep", jobRepository)
.<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
// ํํฐ์
ํ๋ผ๋ฏธํฐ๋ฅผ ๋ฐ๊ธฐ ์ํด @StepScope ์ ์ฉ
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
// ํํฐ์
๊ฒฝ๊ณ๋ฅผ ์ฌ์ฉํ๋ Reader
@Bean
@StepScope
public JdbcCursorItemReader<OrderRecord> partitionedReader(
DataSource dataSource,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
return new JdbcCursorItemReaderBuilder<OrderRecord>()
.name("partitionedOrderReader")
.dataSource(dataSource)
.sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
.preparedStatementSetter(ps -> {
ps.setLong(1, minId);
ps.setLong(2, maxId);
})
.rowMapper(new OrderRecordRowMapper())
.build();
}
}Spring Boot ๋ฉด์ ์ค๋น๊ฐ ๋์ จ๋์?
์ธํฐ๋ํฐ๋ธ ์๋ฎฌ๋ ์ดํฐ, flashcards, ๊ธฐ์ ํ ์คํธ๋ก ์ฐ์ตํ์ธ์.
์ฅ์ ํ์ฉ๊ณผ ๋ณต๊ตฌ
์ง๋ฌธ 7: Spring Batch๊ฐ ์ ๊ณตํ๋ ์ฅ์ ํ์ฉ ๋ฉ์ปค๋์ฆ์ ๋ฌด์์ ๋๊น
Spring Batch๋ ์ํธ ๋ณด์์ ์ธ ์ธ ๊ฐ์ง ๋ฉ์ปค๋์ฆ์ ์ ๊ณตํฉ๋๋ค. skip(์คํจ ์์ดํ ๋ฌด์), retry(์๋ ์ฌ์๋), restart(์คํจํ ์ก ์ฌ๊ฐ)์ ๋๋ค. ์ด ๋ฉ์ปค๋์ฆ์ Step ์์ค์์ ์ค์ ํฉ๋๋ค.
// ์์ ํ ์ฅ์ ํ์ฉ ์ค์
@Configuration
public class FaultTolerantStepConfig {
@Bean
public Step faultTolerantStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<DataRecord> reader,
ItemProcessor<DataRecord, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
SkipPolicy customSkipPolicy) {
return new StepBuilder("faultTolerantStep", jobRepository)
.<DataRecord, ProcessedRecord>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
// ์ฅ์ ํ์ฉ ๋ชจ๋ ํ์ฑํ
.faultTolerant()
// SKIP: ๊ฒ์ฆ ์ค๋ฅ๋ฅผ ์ต๋ 10๊ฑด๊น์ง ๋ฌด์
.skipLimit(10)
.skip(ValidationException.class)
.skip(DataIntegrityViolationException.class)
// ์ผ๋ถ ์ค๋ฅ๋ ์ ๋ ์คํตํ์ง ์์
.noSkip(FatalBatchException.class)
// RETRY: ์ผ์์ ์ค๋ฅ๋ ์ฌ์๋
.retryLimit(3)
.retry(TransientDataAccessException.class)
.retry(DeadlockLoserDataAccessException.class)
// ์ฌ์๋ ์ฌ์ด์ ์ง์์ ๋ฐฑ์คํ
.backOffPolicy(exponentialBackOffPolicy())
// ์คํต์ ๋ก๊น
ํ๋ ๋ฆฌ์ค๋
.listener(skipListener())
.build();
}
@Bean
public BackOffPolicy exponentialBackOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(1000); // 1์ด
policy.setMultiplier(2.0); // ์ฌ์๋๋ง๋ค ๋ ๋ฐฐ
policy.setMaxInterval(10000); // ์ต๋ 10์ด
return policy;
}
@Bean
public SkipListener<DataRecord, ProcessedRecord> skipListener() {
return new SkipListener<>() {
@Override
public void onSkipInRead(Throwable t) {
// ์ฝ์ ์ ์๋ ์์ดํ
๋ก๊น
}
@Override
public void onSkipInProcess(DataRecord item, Throwable t) {
// ์ฒ๋ฆฌ์ ์คํจํ ์์ดํ
๋ก๊น
}
@Override
public void onSkipInWrite(ProcessedRecord item, Throwable t) {
// ๊ธฐ๋ก์ ์คํจํ ์์ดํ
๋ก๊น
}
};
}
}์ฌ์๋๋ ์ผ์์ ์ค๋ฅ(๋คํธ์ํฌ ํ์์์, DB ๋ฐ๋๋ฝ)์ ์ ํฉํฉ๋๋ค. ์คํต์ ์ ์ฒด ์ฒ๋ฆฌ๋ฅผ ๋ง์ง ์์์ผ ํ๋ ๊ฐ๋ณ ๋ฐ์ดํฐ ์ค๋ฅ์ ์๋ง์ต๋๋ค.
์ง๋ฌธ 8: ์ปค์คํ SkipPolicy๋ฅผ ์ด๋ป๊ฒ ๊ตฌํํฉ๋๊น
์ปค์คํ SkipPolicy๋ ์ธ๋ฐํ ์์ฌ ๊ฒฐ์ ์ ๊ฐ๋ฅํ๊ฒ ํฉ๋๋ค. ์์ธ ํ์ , ์ค๋ฅ ์, ํน์ ๋น์ฆ๋์ค ๊ธฐ์ค์ ๋ฐ๋ผ ์คํตํ ์ ์์ต๋๋ค.
// ๊ณ ๊ธ ๋น์ฆ๋์ค ๋ก์ง์ ๊ฐ์ง SkipPolicy
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {
private static final int MAX_SKIP_COUNT = 100;
private static final double MAX_SKIP_PERCENTAGE = 0.05; // ์ต๋ 5%
private final AtomicInteger totalProcessed = new AtomicInteger(0);
private final AtomicInteger skipCount = new AtomicInteger(0);
@Override
public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
// ์น๋ช
์ ์ค๋ฅ๋ ์ ๋ ์คํตํ์ง ์์
if (exception instanceof FatalBatchException
|| exception instanceof OutOfMemoryError) {
return false;
}
// ์คํต ์์ ์ ๋ ์ํ
if (skipCountSoFar >= MAX_SKIP_COUNT) {
return false; // ์ก ์ค๋จ
}
// ๋น์จ ์ํ
int total = totalProcessed.get();
if (total > 1000) { // ์๋ฐ์
ํ์๋ง ์ ์ฉ
double skipPercentage = (double) skipCountSoFar / total;
if (skipPercentage > MAX_SKIP_PERCENTAGE) {
return false; // ๋น์จ์ ์ผ๋ก ์ค๋ฅ๊ฐ ๋๋ฌด ๋ง์
}
}
// ๊ฒ์ฆ ๋ฐ ๋ฐ์ดํฐ ์ค๋ฅ ์คํต
return exception instanceof ValidationException
|| exception instanceof DataFormatException
|| exception instanceof IllegalArgumentException;
}
// ์งํ ์ถ์ ์ ์ํด ๋ฆฌ์ค๋์์ ํธ์ถ
public void incrementProcessed() {
totalProcessed.incrementAndGet();
}
}์ง๋ฌธ 9: ์คํจํ ์ก์ ์ฌ์์์ ์ด๋ป๊ฒ ๋์ํฉ๋๊น
JobRepository๋ ๊ฐ ์คํ ์ํ๋ฅผ ์ ์ฅํฉ๋๋ค. ์ฌ์์ ์ Spring Batch๋ ๋ง์ง๋ง์ผ๋ก ์ปค๋ฐ๋ ์ฒญํฌ๋ฅผ ์๋ณํ๊ณ ๊ทธ ์ง์ ๋ถํฐ ์ฌ๊ฐํฉ๋๋ค. ์ฑ๊ณต์ ์ผ๋ก ์ฒ๋ฆฌ๋ ์์ดํ ์ ๋ค์ ์ฒ๋ฆฌ๋์ง ์์ต๋๋ค.
// ์ก ์ฌ์์ ๊ด๋ฆฌ ์๋น์ค
@Service
public class JobRestartService {
private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
private final Job importJob;
public JobRestartService(JobLauncher jobLauncher,
JobExplorer jobExplorer,
JobRepository jobRepository,
@Qualifier("importJob") Job importJob) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
this.importJob = importJob;
}
public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
// ์คํจํ ์คํ ์กฐํ
JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);
if (failedExecution == null) {
throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
}
// ์ก ์ฌ์์ ๊ฐ๋ฅ ์ฌ๋ถ ํ์ธ
if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
throw new IllegalStateException("Only FAILED jobs can be restarted");
}
// ์๋ ์คํ๊ณผ ๋์ผํ ํ๋ผ๋ฏธํฐ ์ฌ์ฉ
JobParameters originalParams = failedExecution.getJobParameters();
// ์ก ์ฌ์คํ - ๋ง์ง๋ง ์ฒดํฌํฌ์ธํธ์์ ์๋ ์ฌ๊ฐ
return jobLauncher.run(importJob, originalParams);
}
public List<JobExecution> findRestartableJobs() {
// ์์ง ์ฌ์์๋์ง ์์ ๋ชจ๋ FAILED ์คํ ๋์ด
return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.FAILED)
.filter(this::isRestartable)
.toList();
}
private boolean isRestartable(JobExecution execution) {
// ๋ ์ต์ ์ ์ฑ๊ณต ์คํ์ด ์๋์ง ํ์ธ
JobInstance instance = execution.getJobInstance();
return jobExplorer.getJobExecutions(instance).stream()
.noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
}
}์ก์ JobParameters๊ฐ ๋์ผํด์ผ๋ง ์ฌ์์ํ ์ ์์ต๋๋ค. ํ๋ผ๋ฏธํฐ๋ฅผ ๋ณ๊ฒฝํ๋ฉด ์๋ก์ด ์ก ์ธ์คํด์ค๊ฐ ์์ฑ๋์ด ์งํ ์ด๋ ฅ์ด ์ฌ๋ผ์ง๋๋ค.
ํ์ฅ๊ณผ ์ต์ ํ
์ง๋ฌธ 10: ์ด๋ค ํ์ฅ ์ ๋ต์ด ์์ต๋๊น
Spring Batch๋ ๋ค ๊ฐ์ง ์ ๋ต์ ์ ๊ณตํฉ๋๋ค. multi-threaded step(์ฌ๋ฌ ์ค๋ ๋๊ฐ ๋ณ๋ ฌ๋ก ์ฝ๊ธฐ), parallel steps(๋ ๋ฆฝ Step์ ๋ณ๋ ฌ ์คํ), remote chunking(๋ถ์ฐ ์ฒ๋ฆฌ), partitioning(๋ฐ์ดํฐ ๋ถ์ฐ)์ ๋๋ค.
// ๋ฉํฐ ์ค๋ ๋ Step: ์ฌ๋ฌ ์ค๋ ๋๊ฐ ๊ฐ์ ๋ฐ์ดํฐ์
์ ์ฒ๋ฆฌ
@Configuration
public class MultiThreadedStepConfig {
@Bean
public Step multiThreadedStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<Record> reader,
ItemProcessor<Record, ProcessedRecord> processor,
ItemWriter<ProcessedRecord> writer,
TaskExecutor taskExecutor) {
return new StepBuilder("multiThreadedStep", jobRepository)
.<Record, ProcessedRecord>chunk(100, transactionManager)
// ์ฃผ์: reader๋ thread-safeํด์ผ ํจ
.reader(synchronizedReader(reader))
.processor(processor)
.writer(writer)
// 4๊ฐ ์ค๋ ๋๊ฐ ์ฒญํฌ๋ฅผ ๋ณ๋ ฌ ์ฒ๋ฆฌ
.taskExecutor(taskExecutor)
.throttleLimit(4)
.build();
}
// reader๋ฅผ thread-safe๋ก ๋ง๋๋ ๋ํผ
private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
syncReader.setDelegate((ItemStreamReader<Record>) reader);
return syncReader;
}
}// ๋
๋ฆฝ Step์ ๋ณ๋ ฌ ์คํ
@Configuration
public class ParallelStepsConfig {
@Bean
public Job parallelJob(JobRepository jobRepository,
Step loadCustomersStep,
Step loadProductsStep,
Step loadOrdersStep,
Step processDataStep) {
// ๋ณ๋ ฌ ํ๋ก์ฐ: customers์ products๋ฅผ ๋์์ ์ ์ฌ
Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
.start(loadCustomersStep)
.build();
Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
.start(loadProductsStep)
.build();
Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
.start(loadOrdersStep)
.build();
// Split์ด ํ๋ก์ฐ๋ฅผ ๋ณ๋ ฌ ์คํ
return new JobBuilder("parallelJob", jobRepository)
.start(new FlowBuilder<Flow>("parallelLoadFlow")
.split(new SimpleAsyncTaskExecutor())
.add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
.build())
// ๋ณ๋ ฌ ์ ์ฌ ํ ์์ฐจ ์ฒ๋ฆฌ
.next(processDataStep)
.build()
.build();
}
}๋ฉํฐ ์ค๋ ๋ฉ์ reader๋ฅผ ๋๊ธฐํํ ์ ์๋ ๊ฒฝ์ฐ์ ์ ํฉํฉ๋๋ค. ๋์ฉ๋์์๋ ๊ฐ ํํฐ์ ์ด ์์ฒด reader๋ฅผ ๊ฐ์ง๋ฏ๋ก ๊ฒฝํฉ์ด ์๋ ํํฐ์ ๋์ด ๋ ์ข์ต๋๋ค.
์ง๋ฌธ 11: ์ก ์ฑ๋ฅ์ ์ด๋ป๊ฒ ๋ชจ๋ํฐ๋งํฉ๋๊น
Spring Batch๋ ๋ฆฌ์ค๋์ JobRepository๋ก ๋ฉํธ๋ฆญ์ ๋ ธ์ถํฉ๋๋ค. Micrometer ํตํฉ์ผ๋ก Prometheus, Grafana ๋ฑ ๋ค์ํ ๋ชจ๋ํฐ๋ง ์์คํ ์ ๋ด๋ณด๋ผ ์ ์์ต๋๋ค.
// Micrometer ๊ธฐ๋ฐ ๋ชจ๋ํฐ๋ง ์ค์
@Configuration
public class BatchMetricsConfig {
private final MeterRegistry meterRegistry;
public BatchMetricsConfig(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Bean
public JobExecutionListener metricsJobListener() {
return new JobExecutionListener() {
private Timer.Sample jobTimer;
@Override
public void beforeJob(JobExecution jobExecution) {
// ์ก ์์ ์๊ฐ ํ์ด๋จธ ์์
jobTimer = Timer.start(meterRegistry);
Counter.builder("batch.job.started")
.tag("job", jobExecution.getJobInstance().getJobName())
.register(meterRegistry)
.increment();
}
@Override
public void afterJob(JobExecution jobExecution) {
// ์ด ์์ ์๊ฐ ๊ธฐ๋ก
jobTimer.stop(Timer.builder("batch.job.duration")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry));
// ์ํ๋ณ ์ก ์นด์ดํฐ
Counter.builder("batch.job.completed")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry)
.increment();
}
};
}
@Bean
public StepExecutionListener metricsStepListener() {
return new StepExecutionListener() {
@Override
public void afterStep(StepExecution stepExecution) {
String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
String stepName = stepExecution.getStepName();
// ์ฒ๋ฆฌ๋ ๋ฉํธ๋ฆญ
Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
.tag("job", jobName)
.tag("step", stepName)
.register(meterRegistry);
return null;
}
};
}
}์ง๋ฌธ 12: ํํฐ์ ๋์ ํํ ํจ์ ์ ๋ฌด์์ ๋๊น
์์ฃผ ๋ฐ์ํ๋ ์ค์์๋ ๋ถ๊ท ํํ ํํฐ์ (ํ ํํฐ์ ์ ๋ฐ์ดํฐ์ 90%๊ฐ ๋ชฐ๋ฆผ), thread-safeํ์ง ์์ reader, ํํฐ์ ๊ฐ ์๋ชป๋ ์ํ ๊ด๋ฆฌ๊ฐ ์์ต๋๋ค.
// ๋ถํ๋ฅผ ์ค์ ๋ก ๊ท ํ ์ก๋ Partitioner
@Component
public class BalancedPartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// ์ฒ๋ฆฌํ ์ ์ฒด ์์ดํ
์ ๊ณ์ฐ
Integer totalCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);
if (totalCount == null || totalCount == 0) {
return Map.of();
}
// ํํฐ์
๋น ๋ชฉํ ํฌ๊ธฐ ๊ณ์ฐ
int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);
Map<String, ExecutionContext> partitions = new HashMap<>();
// ๊ท ํ ์กํ ํํฐ์
์ ์ํด OFFSET/LIMIT ์ฌ์ฉ
// ๋ฒ์ ๋ถํ ๋ณด๋ค ๋น์ฉ์ ํฌ์ง๋ง ๊ท ํ์ ๋ณด์ฅ
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("offset", i * itemsPerPartition);
context.putInt("limit", itemsPerPartition);
context.putInt("partitionNumber", i);
partitions.put("partition" + i, context);
}
return partitions;
}
}
// OffsetBasedReader.java
// ์คํ์
๊ธฐ๋ฐ ํํฐ์
๋๊ณผ ํธํ๋๋ Reader
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {
private final JdbcTemplate jdbcTemplate;
private Iterator<OrderRecord> iterator;
@Value("#{stepExecutionContext['offset']}")
private int offset;
@Value("#{stepExecutionContext['limit']}")
private int limit;
public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void open(ExecutionContext executionContext) {
// ์ด ํํฐ์
์ ํ ๋น๋ ๋ถ๋ถ๋ง ์ ํํ ๋ก๋ฉ
List<OrderRecord> records = jdbcTemplate.query(
"SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
new OrderRecordRowMapper(),
limit, offset
);
this.iterator = records.iterator();
}
@Override
public OrderRecord read() {
return iterator.hasNext() ? iterator.next() : null;
}
@Override
public void update(ExecutionContext executionContext) {
// ํ์ ์ ์ฌ์์์ฉ ์ํ ์ ์ฅ
}
@Override
public void close() {
// ์ ๋ฆฌ
}
}์๋์ด๋ฅผ ์ํ ๊ณ ๊ธ ์ง๋ฌธ
์ง๋ฌธ 13: ์ก ๊ฐ ์์กด์ฑ์ ์ด๋ป๊ฒ ๊ด๋ฆฌํฉ๋๊น
Spring Batch๋ ์ก ๊ฐ ์์กด์ฑ์ ์์ฒด์ ์ผ๋ก ๊ด๋ฆฌํ์ง ์์ต๋๋ค. ์ธ๋ถ ์ค์ผ์คํธ๋ ์ดํฐ(Airflow, Kubernetes CronJob)๋ JobExplorer๋ฅผ ํ์ฉํ ์ปค์คํ ๊ตฌํ์ด ํด๋ฒ์ ๋๋ค.
// ์ก ๊ฐ ์์กด์ฑ ๊ด๋ฆฌ
@Service
public class JobDependencyService {
private final JobExplorer jobExplorer;
private final JobLauncher jobLauncher;
private final Map<String, Job> jobs;
public JobDependencyService(JobExplorer jobExplorer,
JobLauncher jobLauncher,
Map<String, Job> jobs) {
this.jobExplorer = jobExplorer;
this.jobLauncher = jobLauncher;
this.jobs = jobs;
}
public JobExecution runWithDependencies(String jobName,
JobParameters params,
List<String> dependsOn) throws Exception {
// ๋ชจ๋ ์์กด์ฑ์ด ์ฑ๊ณตํ๋์ง ํ์ธ
for (String dependency : dependsOn) {
if (!hasSuccessfulExecution(dependency, params)) {
throw new JobExecutionException(
"Dependency not satisfied: " + dependency);
}
}
Job job = jobs.get(jobName);
if (job == null) {
throw new IllegalArgumentException("Unknown job: " + jobName);
}
return jobLauncher.run(job, params);
}
private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
// ๋์ผํ ๋น์ฆ๋์ค ํ๋ผ๋ฏธํฐ๋ก COMPLETED์ธ ์คํ ๊ฒ์
return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
.stream()
.flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
.filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
.anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
}
private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
// ๋น์ฆ๋์ค ํ๋ผ๋ฏธํฐ ๋น๊ต(์คํ ํ์์คํฌํ ๋ฌด์)
String actualDate = actual.getString("businessDate");
String expectedDate = expected.getString("businessDate");
return Objects.equals(actualDate, expectedDate);
}
}์ง๋ฌธ 14: Spring Batch ์ก์ ์ด๋ป๊ฒ ํจ๊ณผ์ ์ผ๋ก ํ ์คํธํฉ๋๊น
Spring Batch ์ก ํ ์คํธ๋ ๊ณ์ธต์ ์ ๊ทผ์ด ํ์ํฉ๋๋ค. ์ปดํฌ๋ํธ ๋จ์ ์ ๋ ํ ์คํธ(reader, processor, writer), Step์ ํตํฉ ํ ์คํธ, ์ก ์ ์ฒด์ ์ข ๋จ๊ฐ ํ ์คํธ์ ๋๋ค.
// ํ๋ก์ธ์ ์ ๋ ํ
์คํธ
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {
@Mock
private PricingService pricingService;
@Mock
private ValidationService validationService;
@InjectMocks
private OrderItemProcessor processor;
@Test
void shouldProcessValidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(true);
when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));
// When
ProcessedOrder result = processor.process(input);
// Then
assertThat(result).isNotNull();
assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
}
@Test
void shouldFilterInvalidOrder() {
// Given
OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
when(validationService.isValid(input)).thenReturn(false);
// When
ProcessedOrder result = processor.process(input);
// Then - null์ ํํฐ๋ง๋จ์ ์๋ฏธ
assertThat(result).isNull();
verify(pricingService, never()).calculatePrice(any());
}
}// ์ก ์ ์ฒด์ ๋ํ ํตํฉ ํ
์คํธ
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JobRepositoryTestUtils jobRepositoryTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@BeforeEach
void setup() {
// ํ
์คํธ ์ฌ์ด์ ๋ฉํ๋ฐ์ดํฐ ์ ๋ฆฌ
jobRepositoryTestUtils.removeJobExecutions();
// ํ
์คํธ ๋ฐ์ดํฐ ์ด๊ธฐํ
jdbcTemplate.execute("DELETE FROM processed_orders");
jdbcTemplate.execute("DELETE FROM orders");
}
@Test
void shouldCompleteJobSuccessfully() throws Exception {
// Given - ํ
์คํธ ๋ฐ์ดํฐ
insertTestOrders(100);
// When
JobParameters params = new JobParametersBuilder()
.addLocalDate("businessDate", LocalDate.now())
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncherTestUtils.launchJob(params);
// Then
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
assertThat(countProcessedOrders()).isEqualTo(100);
}
@Test
void shouldHandleEmptyDataset() throws Exception {
// Given - ๋ฐ์ดํฐ ์์
// When
JobExecution execution = jobLauncherTestUtils.launchJob();
// Then - ๋ฐ์ดํฐ๊ฐ ์์ด๋ ์ก์ ์ฑ๊ณต
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
@Test
void shouldRestartFromFailurePoint() throws Exception {
// Given - ์ฒ๋ฆฌ ๋์ค์ ์ค๋ฅ ์๋ฎฌ๋ ์ด์
insertTestOrders(100);
insertPoisonOrder(50); // ์ค๋ฅ ๋ฐ์
// When - ์ฒซ ์คํ ์คํจ
JobExecution firstExecution = jobLauncherTestUtils.launchJob();
assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);
// ๋ฐ์ดํฐ ์์
removePoisonOrder(50);
// When - ์ฌ์์
JobExecution restartExecution = jobLauncherTestUtils.launchJob(
firstExecution.getJobParameters());
// Then - ์คํจ ์ง์ ๋ถํฐ ์ฌ๊ฐ
assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
private void insertTestOrders(int count) {
for (int i = 1; i <= count; i++) {
jdbcTemplate.update(
"INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
i, i * 10, BigDecimal.valueOf(i * 10));
}
}
private int countProcessedOrders() {
return jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM processed_orders", Integer.class);
}
}์ง๋ฌธ 15: ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ธฐ ์ฑ๋ฅ์ ์ด๋ป๊ฒ ์ต์ ํํฉ๋๊น
์ฐ๊ธฐ๋ ์ข ์ข ๋ณ๋ชฉ์ด ๋ฉ๋๋ค. ์ต์ ํ ๋ฐฉ๋ฒ์ผ๋ก๋ JDBC ๋ฐฐ์น ์ธ์ํธ, ์ ์ฌ ๋์์ ์ ์ฝ ์กฐ๊ฑด ๋นํ์ฑํ, ์คํ ์ด์ง ํ ์ด๋ธ ํ์ฉ์ด ์์ต๋๋ค.
// ๋์ฉ๋์ ์ํ ์ต์ ํ๋ Writer
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
private final DataSource dataSource;
public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
this.jdbcTemplate = jdbcTemplate;
this.dataSource = dataSource;
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
List<? extends ProcessedOrder> items = chunk.getItems();
if (items.isEmpty()) {
return;
}
// ๋ฐฐ์น์ ํจ๊ป PreparedStatement ์ฌ์ฉ
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
"VALUES (?, ?, ?, ?)")) {
for (ProcessedOrder order : items) {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
ps.addBatch();
}
// ๋ชจ๋ INSERT๋ฅผ ํ ๋ฒ์ ๋คํธ์ํฌ ํธ์ถ๋ก ์คํ
ps.executeBatch();
}
}
}
// StagingTableWriter.java
// ์ด๋์ฉ๋์ ์ํ ์คํ
์ด์ง ํ
์ด๋ธ ํจํด
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {
private final JdbcTemplate jdbcTemplate;
private String stagingTable;
public StagingTableWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void beforeStep(StepExecution stepExecution) {
// ์ด Step์ ์ํ ์์ ํ
์ด๋ธ ์์ฑ
stagingTable = "staging_orders_" + stepExecution.getId();
jdbcTemplate.execute(
"CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
}
@Override
public void write(Chunk<? extends ProcessedOrder> chunk) {
// ์คํ
์ด์ง ํ
์ด๋ธ์ ๊ธฐ๋ก(FK ์ ์ฝ ์์)
String sql = "INSERT INTO " + stagingTable +
" (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";
jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
(ps, order) -> {
ps.setLong(1, order.orderId());
ps.setLong(2, order.customerId());
ps.setBigDecimal(3, order.finalPrice());
ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
});
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
// ์ต์ข
ํ
์ด๋ธ๋ก ์ผ๊ด ๋ณต์ฌ
jdbcTemplate.execute(
"INSERT INTO processed_orders SELECT * FROM " + stagingTable);
}
// ์คํ
์ด์ง ํ
์ด๋ธ ์ ๋ฆฌ
jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
return stepExecution.getExitStatus();
}
}๊ฒฐ๋ก
๊ธฐ์ ๋ฉด์ ์์ Spring Batch 5๋ฅผ ์ ๋ณตํ๋ ค๋ฉด ๋ด๋ถ ๋ฉ์ปค๋์ฆ์ ๋ํ ๊น์ ์ดํด๊ฐ ํ์ํฉ๋๋ค.
โ ์ํคํ ์ฒ: Job โ Step โ Chunk(Reader, Processor, Writer)
โ ์ฒญํฌ ์ฒ๋ฆฌ: ํฌ๊ธฐ ์ฐ์ , ๋ผ์ดํ์ฌ์ดํด, ํธ๋์ญ์
โ ํํฐ์ ๋: ๋ก์ปฌ vs ์๊ฒฉ, ํํฐ์ ๊ท ํ
โ ์ฅ์ ํ์ฉ: skip, retry, restart์ ์ ํฉํ ์ ์ฑ ์ ์ฉ
โ ํ์ฅ: ๋ฉํฐ ์ค๋ ๋ฉ, parallel steps, remote chunking
โ ํ ์คํธ: ์ ๋, ํตํฉ, ์ข ๋จ๊ฐ
โ ์ต์ ํ: ๋ฐฐ์น ์ฐ๊ธฐ, ์คํ ์ด์ง ํ ์ด๋ธ, ๋ชจ๋ํฐ๋ง
๊ณ ๊ธ ์ง๋ฌธ์ ๋ฐ์ดํฐ ์, ์๊ฐ ์ ์ฝ, ์ค๋ฅ ํ์ฉ ์์ค, ์ฌ์ฉ ๊ฐ๋ฅํ ์ธํ๋ผ ๊ฐ์ ๋งฅ๋ฝ์ ๋ฐ๋ผ ์ํคํ ์ฒ ๊ฒฐ์ ์ ์ ๋นํํ๋ ๋ฅ๋ ฅ์ ํ๊ฐํฉ๋๋ค.
์ฐ์ต์ ์์ํ์ธ์!
๋ฉด์ ์๋ฎฌ๋ ์ดํฐ์ ๊ธฐ์ ํ ์คํธ๋ก ์ง์์ ํ ์คํธํ์ธ์.
ํ๊ทธ
๊ณต์
๊ด๋ จ ๊ธฐ์ฌ

Spring Modulith: ๋ชจ๋๋ฌ ๋ชจ๋๋ฆฌ์ค ์ํคํ ์ฒ ํด์ค
Spring Modulith๋ก ์๋ฐ ๋ชจ๋๋ฌ ๋ชจ๋๋ฆฌ์ค๋ฅผ ๊ตฌ์ถํ๋ ๋ฐฉ๋ฒ์ ๋ฐฐ์๋๋ค. ์ํคํ ์ฒ, ๋ชจ๋, ๋น๋๊ธฐ ์ด๋ฒคํธ, Spring Boot 3 ์์ ๋ก ์ดํด๋ณด๋ ํ ์คํธ.

Spring Boot ๋ฉด์ : ํธ๋์ญ์ ์ ํ ์ค๋ช
Spring Boot ํธ๋์ญ์ ์ ํ ๋ง์คํฐํ๊ธฐ: REQUIRED, REQUIRES_NEW, NESTED ๋ฑ. ์ฝ๋ ์์ ์ ์ผ๋ฐ์ ์ธ ํจ์ ์ ํฌํจํ 12๊ฐ์ง ๋ฉด์ ์ง๋ฌธ.

Spring Security 6: ์๋ฒฝํ JWT ์ธ์ฆ ๊ฐ์ด๋
Spring Security 6๋ก JWT ์ธ์ฆ์ ๊ตฌํํ๋ ์ค์ฉ ๊ฐ์ด๋. ๊ตฌ์ฑ, ํ ํฐ ์์ฑ, ๊ฒ์ฆ, ๋ณด์ ๋ชจ๋ฒ ์ฌ๋ก๋ฅผ ๋ค๋ฃน๋๋ค.