Spring Batch 5 ๋ฉด์ ‘: ํŒŒํ‹ฐ์…”๋‹, ์ฒญํฌ, ์žฅ์•  ํ—ˆ์šฉ

Spring Batch 5 ๋ฉด์ ‘์„ ์ •๋ณตํ•˜์„ธ์š”. ํŒŒํ‹ฐ์…”๋‹, ์ฒญํฌ ์ฒ˜๋ฆฌ, ์žฅ์•  ํ—ˆ์šฉ์— ๊ด€ํ•œ 15๊ฐ€์ง€ ํ•ต์‹ฌ ์งˆ๋ฌธ๊ณผ Java 21 ์˜ˆ์ œ๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

Spring Batch 5 ๋ฉด์ ‘: ํŒŒํ‹ฐ์…”๋‹, ์ฒญํฌ, ์žฅ์•  ํ—ˆ์šฉ

Spring Batch 5๋Š” Spring ์ƒํƒœ๊ณ„์˜ ๋Œ€๊ทœ๋ชจ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ํ•ต์‹ฌ์ž…๋‹ˆ๋‹ค. ๊ธฐ์ˆ  ๋ฉด์ ‘์—์„œ๋Š” ๊ฒฌ๊ณ ํ•˜๊ณ  ํ™•์žฅ ๊ฐ€๋Šฅํ•˜๋ฉฐ ์žฅ์• ์— ๊ฐ•ํ•œ ์žก์„ ์„ค๊ณ„ํ•˜๋Š” ๋Šฅ๋ ฅ์„ ํ‰๊ฐ€ํ•ฉ๋‹ˆ๋‹ค. ํŒŒํ‹ฐ์…”๋‹, ์ฒญํฌ ์ง€ํ–ฅ ์ฒ˜๋ฆฌ, ์žฅ์•  ํ—ˆ์šฉ ๋ฉ”์ปค๋‹ˆ์ฆ˜์— ๋Œ€ํ•œ ์ˆ™๋ จ๋„๊ฐ€ ์‹œ๋‹ˆ์–ด ๊ฐœ๋ฐœ์ž๋ฅผ ๋‘๋“œ๋Ÿฌ์ง€๊ฒŒ ํ•ฉ๋‹ˆ๋‹ค.

๋ฉด์ ‘ ํ•ต์‹ฌ ํฌ์ธํŠธ

์ฑ„์šฉ ๋‹ด๋‹น์ž๋Š” ๊นŠ์€ ์ดํ•ด๋ฅผ ์‹œํ—˜ํ•ฉ๋‹ˆ๋‹ค. ์™œ remote chunking ๋Œ€์‹  partitioning์„ ์„ ํƒํ•ด์•ผ ํ•ฉ๋‹ˆ๊นŒ? ์ฒญํฌ ํฌ๊ธฐ๋ฅผ ์–ด๋–ป๊ฒŒ ์‚ฐ์ •ํ•ด์•ผ ํ•ฉ๋‹ˆ๊นŒ? ์ด๋Ÿฌํ•œ ์•„ํ‚คํ…์ฒ˜ ๊ฒฐ์ •์€ ์‹ค์ œ ํ”„๋กœ๋•์…˜ ๊ฒฝํ—˜์„ ๋“œ๋Ÿฌ๋ƒ…๋‹ˆ๋‹ค.

Spring Batch 5์˜ ํ•ต์‹ฌ ์•„ํ‚คํ…์ฒ˜

์งˆ๋ฌธ 1: Spring Batch์˜ ์ฃผ์š” ๊ตฌ์„ฑ ์š”์†Œ๋Š” ๋ฌด์—‡์ž…๋‹ˆ๊นŒ

Spring Batch ์•„ํ‚คํ…์ฒ˜๋Š” ์„ธ ๊ณ„์ธต์œผ๋กœ ๊ตฌ์„ฑ๋ฉ๋‹ˆ๋‹ค. ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜(์žก๊ณผ ๋น„์ฆˆ๋‹ˆ์Šค ์ฝ”๋“œ), Batch Core(์žก์„ ์‹คํ–‰ํ•˜๊ณ  ์ œ์–ดํ•˜๋Š” ๋Ÿฐํƒ€์ž„ ํด๋ž˜์Šค), ์ธํ”„๋ผ(reader, writer, RetryTemplate ๊ฐ™์€ ๊ณต์šฉ ์„œ๋น„์Šค)์ž…๋‹ˆ๋‹ค.

BatchJobConfig.javajava
// Java 21 ๊ธฐ๋ฐ˜ Spring Batch 5 ์žก ์„ค์ •
@Configuration
public class BatchJobConfig {

    // JobRepository๋Š” ์‹คํ–‰ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•ฉ๋‹ˆ๋‹ค
    // ์žก ์žฌ์‹œ์ž‘๊ณผ ๋ชจ๋‹ˆํ„ฐ๋ง์„ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•ฉ๋‹ˆ๋‹ค
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public BatchJobConfig(JobRepository jobRepository,
                          PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    // Job์€ ์ „์ฒด ๋ฐฐ์น˜ ํ”„๋กœ์„ธ์Šค๋ฅผ ์บก์Аํ™”ํ•ฉ๋‹ˆ๋‹ค
    // ์ˆœ์ฐจ์ ์œผ๋กœ ์‹คํ–‰๋˜๋Š” ํ•˜๋‚˜ ์ด์ƒ์˜ Step์œผ๋กœ ๊ตฌ์„ฑ๋ฉ๋‹ˆ๋‹ค
    @Bean
    public Job importUserJob(Step processUsersStep, Step cleanupStep) {
        return new JobBuilder("importUserJob", jobRepository)
                .start(processUsersStep)      // ๋ฉ”์ธ ์ฒ˜๋ฆฌ Step
                .next(cleanupStep)             // ์ •๋ฆฌ Step
                .build();
    }

    // Step์€ ๋…๋ฆฝ๋œ ์ž‘์—… ๋‹จ์œ„๋ฅผ ๋‚˜ํƒ€๋ƒ…๋‹ˆ๋‹ค
    // ๋‘ ๊ฐ€์ง€ ๋ชจ๋ธ: Tasklet(๋‹จ์ผ ์ž‘์—…) ๋˜๋Š” Chunk(๋ฐ˜๋ณต ์ฒ˜๋ฆฌ)
    @Bean
    public Step processUsersStep(ItemReader<UserRecord> reader,
                                  ItemProcessor<UserRecord, User> processor,
                                  ItemWriter<User> writer) {
        return new StepBuilder("processUsersStep", jobRepository)
                .<UserRecord, User>chunk(100, transactionManager)  // 100๊ฑด๋งˆ๋‹ค ์ปค๋ฐ‹
                .reader(reader)       // ์›๋ณธ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์Œ
                .processor(processor) // ๊ฐ ์•„์ดํ…œ์„ ๋ณ€ํ™˜
                .writer(writer)       // 100๊ฑด ๋‹จ์œ„๋กœ ๊ธฐ๋ก
                .build();
    }
}

JobRepository๋Š” ์‹คํ–‰ ์ƒํƒœ๋ฅผ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์˜์†ํ™”ํ•ฉ๋‹ˆ๋‹ค. ์ด๋Ÿฌํ•œ ์˜์†์„ฑ ๋•๋ถ„์— ์‹คํŒจํ•œ ์žก์„ ์ •ํ™•ํžˆ ๋ฉˆ์ถ˜ ์ง€์ ๋ถ€ํ„ฐ ์žฌ์‹œ์ž‘ํ•  ์ˆ˜ ์žˆ๊ณ , ์ด๋ฏธ ์ปค๋ฐ‹๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์‹œ ์ฒ˜๋ฆฌํ•  ํ•„์š”๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.

์งˆ๋ฌธ 2: Tasklet๊ณผ ์ฒญํฌ ์ง€ํ–ฅ ์ฒ˜๋ฆฌ์˜ ์ฐจ์ด๋Š” ๋ฌด์—‡์ž…๋‹ˆ๊นŒ

Tasklet์€ ๋ฐ˜๋ณตํ•˜์ง€ ์•Š๋Š” ๋‹จ์ผ ๋™์ž‘์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค. ํŒŒ์ผ ์‚ญ์ œ, ์ €์žฅ ํ”„๋กœ์‹œ์ € ํ˜ธ์ถœ, ์•Œ๋ฆผ ๋ฉ”์ผ ๋ฐœ์†ก ๋“ฑ์ด ๊ทธ ์˜ˆ์ž…๋‹ˆ๋‹ค. Chunk๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๊ด€๋ฆฌ ๊ฐ€๋Šฅํ•œ ๋ฐฐ์น˜๋กœ ๋‚˜๋ˆ„์–ด ๋Œ€์šฉ๋Ÿ‰์„ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค.

CleanupTasklet.javajava
// Tasklet: ๋ฐ˜๋ณต ์—†๋Š” ๋‹จ์ผ ๋™์ž‘
@Component
public class CleanupTasklet implements Tasklet {

    private final Path tempDirectory = Path.of("/tmp/batch-work");

    @Override
    public RepeatStatus execute(StepContribution contribution,
                                 ChunkContext chunkContext) throws Exception {
        // ์ฒ˜๋ฆฌ ๊ณผ์ •์—์„œ ์ƒ๊ธด ์ž„์‹œ ํŒŒ์ผ์„ ๋ชจ๋‘ ์‚ญ์ œ
        try (var files = Files.walk(tempDirectory)) {
            files.filter(Files::isRegularFile)
                 .forEach(this::deleteQuietly);
        }

        // FINISHED๋Š” tasklet์ด ์ž‘์—…์„ ์™„๋ฃŒํ–ˆ์Œ์„ ์˜๋ฏธ
        // CONTINUABLE์€ ์‹คํ–‰์„ ๋‹ค์‹œ ์‹œ์ž‘(ํด๋ง์— ์œ ์šฉ)
        return RepeatStatus.FINISHED;
    }

    private void deleteQuietly(Path file) {
        try {
            Files.delete(file);
        } catch (IOException e) {
            // ๋กœ๊ทธ๋งŒ ๋‚จ๊ธฐ๊ณ  ๊ณ„์† - ํŒŒ์ผ ํ•˜๋‚˜๋กœ ์žก์„ ์‹คํŒจ์‹œํ‚ค์ง€ ์•Š์Œ
        }
    }
}
ChunkProcessingConfig.javajava
// ์ฒญํฌ ์ฒ˜๋ฆฌ: ๋Œ€์šฉ๋Ÿ‰ ์ฒ˜๋ฆฌ
@Configuration
public class ChunkProcessingConfig {

    @Bean
    public Step processOrdersStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<OrderRecord> reader,
                                   ItemProcessor<OrderRecord, ProcessedOrder> processor,
                                   ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("processOrdersStep", jobRepository)
                // 500๊ฐœ ์ฒญํฌ: 500๊ฑด ์ฝ๊ธฐ, ์ฒ˜๋ฆฌ, ๊ธฐ๋ก ํ›„ ์ปค๋ฐ‹
                .<OrderRecord, ProcessedOrder>chunk(500, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                // ์ง„ํ–‰ ์ƒํ™ฉ ๋ชจ๋‹ˆํ„ฐ๋ง์šฉ ๋ฆฌ์Šค๋„ˆ
                .listener(new ChunkProgressListener())
                .build();
    }
}

์ฒญํฌ ์ง€ํ–ฅ ์ฒ˜๋ฆฌ๋Š” ์ค‘์š”ํ•œ ์ด์ ์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ ์ตœ์ ํ™”(ํ˜„์žฌ ์ฒญํฌ๋งŒ ๋ฉ”๋ชจ๋ฆฌ์— ๋ณด๊ด€), ์„ธ๋ถ„ํ™”๋œ ํŠธ๋žœ์žญ์…˜(์ฒญํฌ ๋‹จ์œ„ ์ปค๋ฐ‹), ๋งˆ์ง€๋ง‰ ์ปค๋ฐ‹๋œ ์ฒญํฌ์—์„œ์˜ ์žฅ์•  ๋ณต๊ตฌ์ž…๋‹ˆ๋‹ค.

์ฒญํฌ ์ง€ํ–ฅ ์ฒ˜๋ฆฌ ์‹ฌํ™”

์งˆ๋ฌธ 3: ์ฒญํฌ ๋ผ์ดํ”„์‚ฌ์ดํด์€ ์–ด๋–ป๊ฒŒ ๋™์ž‘ํ•ฉ๋‹ˆ๊นŒ

๊ฐ ์ฒญํฌ๋Š” ์ •ํ™•ํ•œ ์‚ฌ์ดํด์„ ๋”ฐ๋ฆ…๋‹ˆ๋‹ค. ์„ค์ •ํ•œ ํฌ๊ธฐ์— ๋„๋‹ฌํ•  ๋•Œ๊นŒ์ง€ ์•„์ดํ…œ์„ ํ•œ ๊ฑด์”ฉ ์ฝ๊ณ , ๊ฐ ์•„์ดํ…œ์„ ๊ฐœ๋ณ„์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•œ ๋’ค, ๊ทธ๋ฃน์„ ํ•œ ๋ฒˆ์— ๊ธฐ๋กํ•ฉ๋‹ˆ๋‹ค. ํŠธ๋žœ์žญ์…˜์ด ์ฒญํฌ ์ „์ฒด๋ฅผ ๊ฐ์Œ‰๋‹ˆ๋‹ค.

OrderItemReader.javajava
// ItemReader: ํ•œ ๋ฒˆ์— ํ•œ ์•„์ดํ…œ์”ฉ ์ฝ์Œ
@StepScope
@Component
public class OrderItemReader implements ItemReader<OrderRecord> {

    // @StepScope: Step ์‹คํ–‰๋งˆ๋‹ค ์ƒˆ ์ธ์Šคํ„ด์Šค
    // ๋™์ ์ธ ์žก ํŒŒ๋ผ๋ฏธํ„ฐ ์ฃผ์ž…์„ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•จ
    @Value("#{jobParameters['startDate']}")
    private LocalDate startDate;

    private Iterator<OrderRecord> orderIterator;

    @BeforeStep
    public void initializeReader(StepExecution stepExecution) {
        // Step ์‹œ์ž‘ ์‹œ ๋ฐ์ดํ„ฐ ๋กœ๋”ฉ
        List<OrderRecord> orders = fetchOrdersFromDate(startDate);
        this.orderIterator = orders.iterator();
    }

    @Override
    public OrderRecord read() {
        // ๋ฐ์ดํ„ฐ ๋์„ ์•Œ๋ฆฌ๊ธฐ ์œ„ํ•ด null ๋ฐ˜ํ™˜
        // Spring Batch๋Š” null์„ ๋ฐ›์„ ๋•Œ๊นŒ์ง€ read()๋ฅผ ํ˜ธ์ถœ
        if (orderIterator.hasNext()) {
            return orderIterator.next();
        }
        return null;  // ๋ฐ์ดํ„ฐ์…‹์˜ ๋
    }

    private List<OrderRecord> fetchOrdersFromDate(LocalDate date) {
        // ๋ฐ์ดํ„ฐ ์†Œ์Šค์—์„œ ์กฐํšŒ
        return List.of();  // ์‹ค์ œ ๊ตฌํ˜„
    }
}
OrderItemProcessor.javajava
// ItemProcessor: ๊ฐ ์•„์ดํ…œ์„ ๊ฐœ๋ณ„์ ์œผ๋กœ ๋ณ€ํ™˜
@Component
public class OrderItemProcessor implements ItemProcessor<OrderRecord, ProcessedOrder> {

    private final PricingService pricingService;
    private final ValidationService validationService;

    public OrderItemProcessor(PricingService pricingService,
                               ValidationService validationService) {
        this.pricingService = pricingService;
        this.validationService = validationService;
    }

    @Override
    public ProcessedOrder process(OrderRecord item) {
        // null ๋ฐ˜ํ™˜์€ ์•„์ดํ…œ์„ ํ•„ํ„ฐ๋ง(๊ธฐ๋ก๋˜์ง€ ์•Š์Œ)
        if (!validationService.isValid(item)) {
            return null;  // ์•„์ดํ…œ ํ•„ํ„ฐ๋ง
        }

        // ๋น„์ฆˆ๋‹ˆ์Šค ๋ณ€ํ™˜
        BigDecimal finalPrice = pricingService.calculatePrice(item);

        return new ProcessedOrder(
                item.orderId(),
                item.customerId(),
                finalPrice,
                LocalDateTime.now()
        );
    }
}
OrderItemWriter.javajava
// ItemWriter: ์ฒญํฌ ์ „์ฒด๋ฅผ ํ•œ ๋ฒˆ์— ๊ธฐ๋ก
@Component
public class OrderItemWriter implements ItemWriter<ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;

    public OrderItemWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) {
        // ์ฒญํฌ๋Š” ์ฒ˜๋ฆฌ๋œ ๋ชจ๋“  ์•„์ดํ…œ์„ ํฌํ•จ
        // ์„ฑ๋Šฅ ์ตœ์ ํ™”๋ฅผ ์œ„ํ•œ ๋ฐฐ์น˜ ๊ธฐ๋ก
        List<? extends ProcessedOrder> items = chunk.getItems();

        jdbcTemplate.batchUpdate(
                "INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)",
                items,
                items.size(),
                (ps, order) -> {
                    ps.setLong(1, order.orderId());
                    ps.setLong(2, order.customerId());
                    ps.setBigDecimal(3, order.finalPrice());
                    ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                }
        );
    }
}

์ฒญํฌ ์ฒ˜๋ฆฌ ์ค‘ ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด ํŠธ๋žœ์žญ์…˜์€ ๋กค๋ฐฑ๋ฉ๋‹ˆ๋‹ค. ์ดํ›„ ์žก์€ JobRepository์— ์ €์žฅ๋œ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•ด ํ•ด๋‹น ์ฒญํฌ๋ถ€ํ„ฐ ์žฌ๊ฐœํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์งˆ๋ฌธ 4: ์ตœ์ ์˜ ์ฒญํฌ ํฌ๊ธฐ๋Š” ์–ด๋–ป๊ฒŒ ์„ ํƒํ•ฉ๋‹ˆ๊นŒ

์ฒญํฌ ํฌ๊ธฐ๋Š” ์„ฑ๋Šฅ๊ณผ ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ์— ์ง์ ‘ ์˜ํ–ฅ์„ ์ค๋‹ˆ๋‹ค. ๋„ˆ๋ฌด ์ž‘์œผ๋ฉด ์ปค๋ฐ‹์ด ๋งŽ์•„์ ธ ์˜ค๋ฒ„ํ—ค๋“œ๊ฐ€ ์ปค์ง€๊ณ , ๋„ˆ๋ฌด ํฌ๋ฉด ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ๊ณผ๋„ํ•˜๊ฒŒ ์‚ฌ์šฉํ•˜๋ฉฐ ์žฅ์•  ์‹œ ๋กค๋ฐฑ ์‹œ๊ฐ„์ด ๊ธธ์–ด์ง‘๋‹ˆ๋‹ค.

ChunkSizingConfig.javajava
// ์ฒญํฌ ํฌ๊ธฐ์˜ ๋™์  ์„ค์ •
@Configuration
public class ChunkSizingConfig {

    // ๋Œ€๋ถ€๋ถ„ ์ƒํ™ฉ์—์„œ ํ•ฉ๋ฆฌ์ ์ธ ๊ธฐ๋ณธ๊ฐ’
    private static final int DEFAULT_CHUNK_SIZE = 100;

    // ๊ฐ€๋ฒผ์šด ์•„์ดํ…œ(ํ•„๋“œ๊ฐ€ ์ ์€ ๊ฒฝ์šฐ)
    private static final int LIGHT_ITEMS_CHUNK_SIZE = 500;

    // ๋ฌด๊ฑฐ์šด ์•„์ดํ…œ(blob, ๋ฌธ์„œ)
    private static final int HEAVY_ITEMS_CHUNK_SIZE = 25;

    @Bean
    public Step processLightDataStep(JobRepository jobRepository,
                                      PlatformTransactionManager txManager,
                                      ItemReader<LightRecord> reader,
                                      ItemWriter<LightRecord> writer) {
        return new StepBuilder("processLightDataStep", jobRepository)
                // ๊ฐ€๋ฒผ์šด ์•„์ดํ…œ: ์ปค๋ฐ‹ ํšŸ์ˆ˜๋ฅผ ์ค„์ด๊ธฐ ์œ„ํ•ด ๋” ํฐ ์ฒญํฌ
                .<LightRecord, LightRecord>chunk(LIGHT_ITEMS_CHUNK_SIZE, txManager)
                .reader(reader)
                .writer(writer)
                .build();
    }

    @Bean
    public Step processDocumentsStep(JobRepository jobRepository,
                                      PlatformTransactionManager txManager,
                                      ItemReader<Document> reader,
                                      ItemProcessor<Document, ProcessedDocument> processor,
                                      ItemWriter<ProcessedDocument> writer) {
        return new StepBuilder("processDocumentsStep", jobRepository)
                // ๋ฌด๊ฑฐ์šด ๋ฌธ์„œ: ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ์„ ์ œํ•œํ•˜๊ธฐ ์œ„ํ•ด ์ž‘์€ ์ฒญํฌ
                .<Document, ProcessedDocument>chunk(HEAVY_ITEMS_CHUNK_SIZE, txManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}
์‹ค์šฉ์ ์ธ ๊ธฐ์ค€

์ฒญํฌ๋‹น 100๊ฑด์œผ๋กœ ์‹œ์ž‘ํ•œ ๋’ค ์ง€ํ‘œ(์ปค๋ฐ‹ ์‹œ๊ฐ„, ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ๋Ÿ‰, ๋กค๋ฐฑ ์‹œ๊ฐ„)์— ๋”ฐ๋ผ ์กฐ์ •ํ•ฉ๋‹ˆ๋‹ค. ๋ฆฌ์Šค๋„ˆ๋กœ ๋ชจ๋‹ˆํ„ฐ๋งํ•ด ์ตœ์ ์ ์„ ์ฐพ์œผ์‹ญ์‹œ์˜ค.

๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ํŒŒํ‹ฐ์…”๋‹

์งˆ๋ฌธ 5: ํŒŒํ‹ฐ์…”๋‹์ด๋ž€ ๋ฌด์—‡์ด๋ฉฐ ์–ธ์ œ ์‚ฌ์šฉํ•ฉ๋‹ˆ๊นŒ

ํŒŒํ‹ฐ์…”๋‹์€ ๋ฐ์ดํ„ฐ์…‹์„ ๋…๋ฆฝ์ ์ธ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋‚˜๋ˆ„์–ด ๋ณ‘๋ ฌ๋กœ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค. ๊ฐ ํŒŒํ‹ฐ์…˜์€ ์ž์ฒด ์Šค๋ ˆ๋“œ(๋กœ์ปฌ) ๋˜๋Š” ์›๊ฒฉ worker์—์„œ ์‹คํ–‰๋ฉ๋‹ˆ๋‹ค. ์ด ๋ฐฉ์‹์€ ์žฌ์‹œ์ž‘ ๋Šฅ๋ ฅ์„ ์žƒ์ง€ ์•Š์œผ๋ฉด์„œ ์ฒ˜๋ฆฌ๋Ÿ‰์„ ํฌ๊ฒŒ ๋Š˜๋ฆฝ๋‹ˆ๋‹ค.

PartitionedJobConfig.javajava
// ํŒŒํ‹ฐ์…”๋‹๋œ ์žก ์„ค์ •
@Configuration
public class PartitionedJobConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public PartitionedJobConfig(JobRepository jobRepository,
                                 PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    @Bean
    public Job partitionedImportJob(Step partitionedStep) {
        return new JobBuilder("partitionedImportJob", jobRepository)
                .start(partitionedStep)
                .build();
    }

    // ๋งค๋‹ˆ์ € Step: ํŒŒํ‹ฐ์…˜์„ ์กฐ์œจ
    @Bean
    public Step partitionedStep(Partitioner partitioner,
                                 Step workerStep,
                                 TaskExecutor taskExecutor) {
        return new StepBuilder("partitionedStep", jobRepository)
                // Partitioner๋ฅผ ํ†ตํ•ด ์ž‘์—… ๋ถ„ํ• 
                .partitioner("workerStep", partitioner)
                // ๊ฐ ํŒŒํ‹ฐ์…˜๋งˆ๋‹ค ์‹คํ–‰ํ•  Step
                .step(workerStep)
                // 8๊ฐœ ๋ณ‘๋ ฌ ์Šค๋ ˆ๋“œ
                .taskExecutor(taskExecutor)
                // ์ƒ์„ฑํ•  ํŒŒํ‹ฐ์…˜ ์ˆ˜
                .gridSize(8)
                .build();
    }

    // ๋ณ‘๋ ฌ ์‹คํ–‰์„ ์œ„ํ•œ TaskExecutor
    @Bean
    public TaskExecutor batchTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(16);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("batch-partition-");
        executor.initialize();
        return executor;
    }
}
RangePartitioner.javajava
// ID ๋ฒ”์œ„ ๊ธฐ๋ฐ˜ Partitioner
@Component
public class RangePartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public RangePartitioner(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // ๋ฐ์ดํ„ฐ์…‹์˜ ๊ฒฝ๊ณ„๋ฅผ ์กฐํšŒ
        Long minId = jdbcTemplate.queryForObject(
                "SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
        Long maxId = jdbcTemplate.queryForObject(
                "SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);

        if (minId == null || maxId == null) {
            return Map.of();  // ์ฒ˜๋ฆฌํ•  ๋ฐ์ดํ„ฐ ์—†์Œ
        }

        // ๊ฐ ํŒŒํ‹ฐ์…˜์˜ ํฌ๊ธฐ๋ฅผ ๊ณ„์‚ฐ
        long range = (maxId - minId) / gridSize + 1;
        Map<String, ExecutionContext> partitions = new HashMap<>();

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            long start = minId + (i * range);
            long end = Math.min(start + range - 1, maxId);

            // ๊ฐ ํŒŒํ‹ฐ์…˜์ด ์ž์‹ ์˜ ๊ฒฝ๊ณ„๋ฅผ ๋ฐ›์Œ
            context.putLong("minId", start);
            context.putLong("maxId", end);
            context.putInt("partitionNumber", i);

            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

ํŒŒํ‹ฐ์…”๋‹์€ ์•„์ดํ…œ์ด ์„œ๋กœ ๋…๋ฆฝ์ ์ธ ํฐ ๋ฐ์ดํ„ฐ์…‹์— ์ ํ•ฉํ•ฉ๋‹ˆ๋‹ค. ๋А๋ฆฐ ํŒŒํ‹ฐ์…˜์ด ์ „์ฒด ์žก์„ ์ง€์—ฐ์‹œํ‚ค์ง€ ์•Š๋„๋ก ๊ท ํ˜•์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค.

์งˆ๋ฌธ 6: ๋กœ์ปฌ ํŒŒํ‹ฐ์…”๋‹๊ณผ ์›๊ฒฉ ํŒŒํ‹ฐ์…”๋‹์˜ ์ฐจ์ด๋Š” ๋ฌด์—‡์ž…๋‹ˆ๊นŒ

๋กœ์ปฌ ํŒŒํ‹ฐ์…”๋‹์€ ๋™์ผ JVM์˜ ์Šค๋ ˆ๋“œ ํ’€์—์„œ ๋ชจ๋“  ํŒŒํ‹ฐ์…˜์„ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค. ์›๊ฒฉ ํŒŒํ‹ฐ์…”๋‹์€ ๋ฉ”์‹œ์ง• ๋ฏธ๋“ค์›จ์–ด๋ฅผ ํ†ตํ•ด ์—ฌ๋Ÿฌ JVM(worker)์œผ๋กœ ํŒŒํ‹ฐ์…˜์„ ๋ถ„์‚ฐํ•ฉ๋‹ˆ๋‹ค.

RemotePartitioningConfig.javajava
// ๋ฉ”์‹œ์ง•์„ ํ™œ์šฉํ•œ ์›๊ฒฉ ํŒŒํ‹ฐ์…”๋‹ ์„ค์ •
@Configuration
public class RemotePartitioningConfig {

    @Bean
    public Step managerStep(JobRepository jobRepository,
                             Partitioner partitioner,
                             MessageChannelPartitionHandler partitionHandler) {
        return new StepBuilder("managerStep", jobRepository)
                .partitioner("workerStep", partitioner)
                // ์›๊ฒฉ worker์™€ ํ†ต์‹ ํ•˜๋Š” ํ•ธ๋“ค๋Ÿฌ
                .partitionHandler(partitionHandler)
                .build();
    }

    // PartitionHandler๊ฐ€ ExecutionContext๋ฅผ worker๋กœ ์ „์†ก
    @Bean
    public MessageChannelPartitionHandler partitionHandler(
            MessagingTemplate messagingTemplate,
            JobExplorer jobExplorer) {
        MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
        handler.setStepName("workerStep");
        handler.setGridSize(4);
        handler.setMessagingOperations(messagingTemplate);
        handler.setJobExplorer(jobExplorer);
        // worker ์™„๋ฃŒ ๋Œ€๊ธฐ ํƒ€์ž„์•„์›ƒ
        handler.setPollInterval(5000L);
        return handler;
    }
}
WorkerConfiguration.javajava
// worker ์ธก ์„ค์ •
@Configuration
public class WorkerConfiguration {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public WorkerConfiguration(JobRepository jobRepository,
                                PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    // worker๋Š” ํŒŒํ‹ฐ์…˜์„ ๋ฐ›์•„ Step์„ ์‹คํ–‰
    @Bean
    public Step workerStep(ItemReader<OrderRecord> reader,
                            ItemProcessor<OrderRecord, ProcessedOrder> processor,
                            ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("workerStep", jobRepository)
                .<OrderRecord, ProcessedOrder>chunk(100, transactionManager)
                // ํŒŒํ‹ฐ์…˜ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ๋ฐ›๊ธฐ ์œ„ํ•ด @StepScope ์ ์šฉ
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    // ํŒŒํ‹ฐ์…˜ ๊ฒฝ๊ณ„๋ฅผ ์‚ฌ์šฉํ•˜๋Š” Reader
    @Bean
    @StepScope
    public JdbcCursorItemReader<OrderRecord> partitionedReader(
            DataSource dataSource,
            @Value("#{stepExecutionContext['minId']}") Long minId,
            @Value("#{stepExecutionContext['maxId']}") Long maxId) {
        return new JdbcCursorItemReaderBuilder<OrderRecord>()
                .name("partitionedOrderReader")
                .dataSource(dataSource)
                .sql("SELECT * FROM orders WHERE id BETWEEN ? AND ? AND status = 'PENDING'")
                .preparedStatementSetter(ps -> {
                    ps.setLong(1, minId);
                    ps.setLong(2, maxId);
                })
                .rowMapper(new OrderRecordRowMapper())
                .build();
    }
}

Spring Boot ๋ฉด์ ‘ ์ค€๋น„๊ฐ€ ๋˜์…จ๋‚˜์š”?

์ธํ„ฐ๋ž™ํ‹ฐ๋ธŒ ์‹œ๋ฎฌ๋ ˆ์ดํ„ฐ, flashcards, ๊ธฐ์ˆ  ํ…Œ์ŠคํŠธ๋กœ ์—ฐ์Šตํ•˜์„ธ์š”.

์žฅ์•  ํ—ˆ์šฉ๊ณผ ๋ณต๊ตฌ

์งˆ๋ฌธ 7: Spring Batch๊ฐ€ ์ œ๊ณตํ•˜๋Š” ์žฅ์•  ํ—ˆ์šฉ ๋ฉ”์ปค๋‹ˆ์ฆ˜์€ ๋ฌด์—‡์ž…๋‹ˆ๊นŒ

Spring Batch๋Š” ์ƒํ˜ธ ๋ณด์™„์ ์ธ ์„ธ ๊ฐ€์ง€ ๋ฉ”์ปค๋‹ˆ์ฆ˜์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. skip(์‹คํŒจ ์•„์ดํ…œ ๋ฌด์‹œ), retry(์ž๋™ ์žฌ์‹œ๋„), restart(์‹คํŒจํ•œ ์žก ์žฌ๊ฐœ)์ž…๋‹ˆ๋‹ค. ์ด ๋ฉ”์ปค๋‹ˆ์ฆ˜์€ Step ์ˆ˜์ค€์—์„œ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค.

FaultTolerantStepConfig.javajava
// ์™„์ „ํ•œ ์žฅ์•  ํ—ˆ์šฉ ์„ค์ •
@Configuration
public class FaultTolerantStepConfig {

    @Bean
    public Step faultTolerantStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<DataRecord> reader,
                                   ItemProcessor<DataRecord, ProcessedRecord> processor,
                                   ItemWriter<ProcessedRecord> writer,
                                   SkipPolicy customSkipPolicy) {
        return new StepBuilder("faultTolerantStep", jobRepository)
                .<DataRecord, ProcessedRecord>chunk(100, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                // ์žฅ์•  ํ—ˆ์šฉ ๋ชจ๋“œ ํ™œ์„ฑํ™”
                .faultTolerant()
                // SKIP: ๊ฒ€์ฆ ์˜ค๋ฅ˜๋ฅผ ์ตœ๋Œ€ 10๊ฑด๊นŒ์ง€ ๋ฌด์‹œ
                .skipLimit(10)
                .skip(ValidationException.class)
                .skip(DataIntegrityViolationException.class)
                // ์ผ๋ถ€ ์˜ค๋ฅ˜๋Š” ์ ˆ๋Œ€ ์Šคํ‚ตํ•˜์ง€ ์•Š์Œ
                .noSkip(FatalBatchException.class)
                // RETRY: ์ผ์‹œ์  ์˜ค๋ฅ˜๋Š” ์žฌ์‹œ๋„
                .retryLimit(3)
                .retry(TransientDataAccessException.class)
                .retry(DeadlockLoserDataAccessException.class)
                // ์žฌ์‹œ๋„ ์‚ฌ์ด์˜ ์ง€์ˆ˜์  ๋ฐฑ์˜คํ”„
                .backOffPolicy(exponentialBackOffPolicy())
                // ์Šคํ‚ต์„ ๋กœ๊น…ํ•˜๋Š” ๋ฆฌ์Šค๋„ˆ
                .listener(skipListener())
                .build();
    }

    @Bean
    public BackOffPolicy exponentialBackOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(1000);  // 1์ดˆ
        policy.setMultiplier(2.0);         // ์žฌ์‹œ๋„๋งˆ๋‹ค ๋‘ ๋ฐฐ
        policy.setMaxInterval(10000);      // ์ตœ๋Œ€ 10์ดˆ
        return policy;
    }

    @Bean
    public SkipListener<DataRecord, ProcessedRecord> skipListener() {
        return new SkipListener<>() {
            @Override
            public void onSkipInRead(Throwable t) {
                // ์ฝ์„ ์ˆ˜ ์—†๋Š” ์•„์ดํ…œ ๋กœ๊น…
            }

            @Override
            public void onSkipInProcess(DataRecord item, Throwable t) {
                // ์ฒ˜๋ฆฌ์— ์‹คํŒจํ•œ ์•„์ดํ…œ ๋กœ๊น…
            }

            @Override
            public void onSkipInWrite(ProcessedRecord item, Throwable t) {
                // ๊ธฐ๋ก์— ์‹คํŒจํ•œ ์•„์ดํ…œ ๋กœ๊น…
            }
        };
    }
}

์žฌ์‹œ๋„๋Š” ์ผ์‹œ์  ์˜ค๋ฅ˜(๋„คํŠธ์›Œํฌ ํƒ€์ž„์•„์›ƒ, DB ๋ฐ๋“œ๋ฝ)์— ์ ํ•ฉํ•ฉ๋‹ˆ๋‹ค. ์Šคํ‚ต์€ ์ „์ฒด ์ฒ˜๋ฆฌ๋ฅผ ๋ง‰์ง€ ์•Š์•„์•ผ ํ•˜๋Š” ๊ฐœ๋ณ„ ๋ฐ์ดํ„ฐ ์˜ค๋ฅ˜์— ์•Œ๋งž์Šต๋‹ˆ๋‹ค.

์งˆ๋ฌธ 8: ์ปค์Šคํ…€ SkipPolicy๋ฅผ ์–ด๋–ป๊ฒŒ ๊ตฌํ˜„ํ•ฉ๋‹ˆ๊นŒ

์ปค์Šคํ…€ SkipPolicy๋Š” ์„ธ๋ฐ€ํ•œ ์˜์‚ฌ ๊ฒฐ์ •์„ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ์™ธ ํƒ€์ž…, ์˜ค๋ฅ˜ ์ˆ˜, ํŠน์ • ๋น„์ฆˆ๋‹ˆ์Šค ๊ธฐ์ค€์— ๋”ฐ๋ผ ์Šคํ‚ตํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

AdaptiveSkipPolicy.javajava
// ๊ณ ๊ธ‰ ๋น„์ฆˆ๋‹ˆ์Šค ๋กœ์ง์„ ๊ฐ€์ง„ SkipPolicy
@Component
public class AdaptiveSkipPolicy implements SkipPolicy {

    private static final int MAX_SKIP_COUNT = 100;
    private static final double MAX_SKIP_PERCENTAGE = 0.05;  // ์ตœ๋Œ€ 5%

    private final AtomicInteger totalProcessed = new AtomicInteger(0);
    private final AtomicInteger skipCount = new AtomicInteger(0);

    @Override
    public boolean shouldSkip(Throwable exception, long skipCountSoFar) {
        // ์น˜๋ช…์  ์˜ค๋ฅ˜๋Š” ์ ˆ๋Œ€ ์Šคํ‚ตํ•˜์ง€ ์•Š์Œ
        if (exception instanceof FatalBatchException
                || exception instanceof OutOfMemoryError) {
            return false;
        }

        // ์Šคํ‚ต ์ˆ˜์˜ ์ ˆ๋Œ€ ์ƒํ•œ
        if (skipCountSoFar >= MAX_SKIP_COUNT) {
            return false;  // ์žก ์ค‘๋‹จ
        }

        // ๋น„์œจ ์ƒํ•œ
        int total = totalProcessed.get();
        if (total > 1000) {  // ์›Œ๋ฐ์—… ํ›„์—๋งŒ ์ ์šฉ
            double skipPercentage = (double) skipCountSoFar / total;
            if (skipPercentage > MAX_SKIP_PERCENTAGE) {
                return false;  // ๋น„์œจ์ ์œผ๋กœ ์˜ค๋ฅ˜๊ฐ€ ๋„ˆ๋ฌด ๋งŽ์Œ
            }
        }

        // ๊ฒ€์ฆ ๋ฐ ๋ฐ์ดํ„ฐ ์˜ค๋ฅ˜ ์Šคํ‚ต
        return exception instanceof ValidationException
                || exception instanceof DataFormatException
                || exception instanceof IllegalArgumentException;
    }

    // ์ง„ํ–‰ ์ถ”์ ์„ ์œ„ํ•ด ๋ฆฌ์Šค๋„ˆ์—์„œ ํ˜ธ์ถœ
    public void incrementProcessed() {
        totalProcessed.incrementAndGet();
    }
}

์งˆ๋ฌธ 9: ์‹คํŒจํ•œ ์žก์˜ ์žฌ์‹œ์ž‘์€ ์–ด๋–ป๊ฒŒ ๋™์ž‘ํ•ฉ๋‹ˆ๊นŒ

JobRepository๋Š” ๊ฐ ์‹คํ–‰ ์ƒํƒœ๋ฅผ ์ €์žฅํ•ฉ๋‹ˆ๋‹ค. ์žฌ์‹œ์ž‘ ์‹œ Spring Batch๋Š” ๋งˆ์ง€๋ง‰์œผ๋กœ ์ปค๋ฐ‹๋œ ์ฒญํฌ๋ฅผ ์‹๋ณ„ํ•˜๊ณ  ๊ทธ ์ง€์ ๋ถ€ํ„ฐ ์žฌ๊ฐœํ•ฉ๋‹ˆ๋‹ค. ์„ฑ๊ณต์ ์œผ๋กœ ์ฒ˜๋ฆฌ๋œ ์•„์ดํ…œ์€ ๋‹ค์‹œ ์ฒ˜๋ฆฌ๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

JobRestartService.javajava
// ์žก ์žฌ์‹œ์ž‘ ๊ด€๋ฆฌ ์„œ๋น„์Šค
@Service
public class JobRestartService {

    private final JobLauncher jobLauncher;
    private final JobExplorer jobExplorer;
    private final JobRepository jobRepository;
    private final Job importJob;

    public JobRestartService(JobLauncher jobLauncher,
                              JobExplorer jobExplorer,
                              JobRepository jobRepository,
                              @Qualifier("importJob") Job importJob) {
        this.jobLauncher = jobLauncher;
        this.jobExplorer = jobExplorer;
        this.jobRepository = jobRepository;
        this.importJob = importJob;
    }

    public JobExecution restartFailedJob(Long jobExecutionId) throws Exception {
        // ์‹คํŒจํ•œ ์‹คํ–‰ ์กฐํšŒ
        JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);

        if (failedExecution == null) {
            throw new IllegalArgumentException("Job execution not found: " + jobExecutionId);
        }

        // ์žก ์žฌ์‹œ์ž‘ ๊ฐ€๋Šฅ ์—ฌ๋ถ€ ํ™•์ธ
        if (!failedExecution.getStatus().equals(BatchStatus.FAILED)) {
            throw new IllegalStateException("Only FAILED jobs can be restarted");
        }

        // ์›๋ž˜ ์‹คํ–‰๊ณผ ๋™์ผํ•œ ํŒŒ๋ผ๋ฏธํ„ฐ ์‚ฌ์šฉ
        JobParameters originalParams = failedExecution.getJobParameters();

        // ์žก ์žฌ์‹คํ–‰ - ๋งˆ์ง€๋ง‰ ์ฒดํฌํฌ์ธํŠธ์—์„œ ์ž๋™ ์žฌ๊ฐœ
        return jobLauncher.run(importJob, originalParams);
    }

    public List<JobExecution> findRestartableJobs() {
        // ์•„์ง ์žฌ์‹œ์ž‘๋˜์ง€ ์•Š์€ ๋ชจ๋“  FAILED ์‹คํ–‰ ๋‚˜์—ด
        return jobExplorer.findJobInstancesByJobName(importJob.getName(), 0, 100)
                .stream()
                .flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
                .filter(exec -> exec.getStatus() == BatchStatus.FAILED)
                .filter(this::isRestartable)
                .toList();
    }

    private boolean isRestartable(JobExecution execution) {
        // ๋” ์ตœ์‹ ์˜ ์„ฑ๊ณต ์‹คํ–‰์ด ์—†๋Š”์ง€ ํ™•์ธ
        JobInstance instance = execution.getJobInstance();
        return jobExplorer.getJobExecutions(instance).stream()
                .noneMatch(exec -> exec.getStatus() == BatchStatus.COMPLETED);
    }
}
๋ฉด์ ‘์˜ ํ•จ์ •

์žก์€ JobParameters๊ฐ€ ๋™์ผํ•ด์•ผ๋งŒ ์žฌ์‹œ์ž‘ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ๋ณ€๊ฒฝํ•˜๋ฉด ์ƒˆ๋กœ์šด ์žก ์ธ์Šคํ„ด์Šค๊ฐ€ ์ƒ์„ฑ๋˜์–ด ์ง„ํ–‰ ์ด๋ ฅ์ด ์‚ฌ๋ผ์ง‘๋‹ˆ๋‹ค.

ํ™•์žฅ๊ณผ ์ตœ์ ํ™”

์งˆ๋ฌธ 10: ์–ด๋–ค ํ™•์žฅ ์ „๋žต์ด ์žˆ์Šต๋‹ˆ๊นŒ

Spring Batch๋Š” ๋„ค ๊ฐ€์ง€ ์ „๋žต์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. multi-threaded step(์—ฌ๋Ÿฌ ์Šค๋ ˆ๋“œ๊ฐ€ ๋ณ‘๋ ฌ๋กœ ์ฝ๊ธฐ), parallel steps(๋…๋ฆฝ Step์˜ ๋ณ‘๋ ฌ ์‹คํ–‰), remote chunking(๋ถ„์‚ฐ ์ฒ˜๋ฆฌ), partitioning(๋ฐ์ดํ„ฐ ๋ถ„์‚ฐ)์ž…๋‹ˆ๋‹ค.

MultiThreadedStepConfig.javajava
// ๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋“œ Step: ์—ฌ๋Ÿฌ ์Šค๋ ˆ๋“œ๊ฐ€ ๊ฐ™์€ ๋ฐ์ดํ„ฐ์…‹์„ ์ฒ˜๋ฆฌ
@Configuration
public class MultiThreadedStepConfig {

    @Bean
    public Step multiThreadedStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   ItemReader<Record> reader,
                                   ItemProcessor<Record, ProcessedRecord> processor,
                                   ItemWriter<ProcessedRecord> writer,
                                   TaskExecutor taskExecutor) {
        return new StepBuilder("multiThreadedStep", jobRepository)
                .<Record, ProcessedRecord>chunk(100, transactionManager)
                // ์ฃผ์˜: reader๋Š” thread-safeํ•ด์•ผ ํ•จ
                .reader(synchronizedReader(reader))
                .processor(processor)
                .writer(writer)
                // 4๊ฐœ ์Šค๋ ˆ๋“œ๊ฐ€ ์ฒญํฌ๋ฅผ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ
                .taskExecutor(taskExecutor)
                .throttleLimit(4)
                .build();
    }

    // reader๋ฅผ thread-safe๋กœ ๋งŒ๋“œ๋Š” ๋ž˜ํผ
    private ItemReader<Record> synchronizedReader(ItemReader<Record> reader) {
        SynchronizedItemStreamReader<Record> syncReader = new SynchronizedItemStreamReader<>();
        syncReader.setDelegate((ItemStreamReader<Record>) reader);
        return syncReader;
    }
}
ParallelStepsConfig.javajava
// ๋…๋ฆฝ Step์„ ๋ณ‘๋ ฌ ์‹คํ–‰
@Configuration
public class ParallelStepsConfig {

    @Bean
    public Job parallelJob(JobRepository jobRepository,
                            Step loadCustomersStep,
                            Step loadProductsStep,
                            Step loadOrdersStep,
                            Step processDataStep) {
        // ๋ณ‘๋ ฌ ํ”Œ๋กœ์šฐ: customers์™€ products๋ฅผ ๋™์‹œ์— ์ ์žฌ
        Flow loadCustomersFlow = new FlowBuilder<Flow>("loadCustomersFlow")
                .start(loadCustomersStep)
                .build();

        Flow loadProductsFlow = new FlowBuilder<Flow>("loadProductsFlow")
                .start(loadProductsStep)
                .build();

        Flow loadOrdersFlow = new FlowBuilder<Flow>("loadOrdersFlow")
                .start(loadOrdersStep)
                .build();

        // Split์ด ํ”Œ๋กœ์šฐ๋ฅผ ๋ณ‘๋ ฌ ์‹คํ–‰
        return new JobBuilder("parallelJob", jobRepository)
                .start(new FlowBuilder<Flow>("parallelLoadFlow")
                        .split(new SimpleAsyncTaskExecutor())
                        .add(loadCustomersFlow, loadProductsFlow, loadOrdersFlow)
                        .build())
                // ๋ณ‘๋ ฌ ์ ์žฌ ํ›„ ์ˆœ์ฐจ ์ฒ˜๋ฆฌ
                .next(processDataStep)
                .build()
                .build();
    }
}

๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋”ฉ์€ reader๋ฅผ ๋™๊ธฐํ™”ํ•  ์ˆ˜ ์žˆ๋Š” ๊ฒฝ์šฐ์— ์ ํ•ฉํ•ฉ๋‹ˆ๋‹ค. ๋Œ€์šฉ๋Ÿ‰์—์„œ๋Š” ๊ฐ ํŒŒํ‹ฐ์…˜์ด ์ž์ฒด reader๋ฅผ ๊ฐ€์ง€๋ฏ€๋กœ ๊ฒฝํ•ฉ์ด ์—†๋Š” ํŒŒํ‹ฐ์…”๋‹์ด ๋” ์ข‹์Šต๋‹ˆ๋‹ค.

์งˆ๋ฌธ 11: ์žก ์„ฑ๋Šฅ์„ ์–ด๋–ป๊ฒŒ ๋ชจ๋‹ˆํ„ฐ๋งํ•ฉ๋‹ˆ๊นŒ

Spring Batch๋Š” ๋ฆฌ์Šค๋„ˆ์™€ JobRepository๋กœ ๋ฉ”ํŠธ๋ฆญ์„ ๋…ธ์ถœํ•ฉ๋‹ˆ๋‹ค. Micrometer ํ†ตํ•ฉ์œผ๋กœ Prometheus, Grafana ๋“ฑ ๋‹ค์–‘ํ•œ ๋ชจ๋‹ˆํ„ฐ๋ง ์‹œ์Šคํ…œ์— ๋‚ด๋ณด๋‚ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

BatchMetricsConfig.javajava
// Micrometer ๊ธฐ๋ฐ˜ ๋ชจ๋‹ˆํ„ฐ๋ง ์„ค์ •
@Configuration
public class BatchMetricsConfig {

    private final MeterRegistry meterRegistry;

    public BatchMetricsConfig(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }

    @Bean
    public JobExecutionListener metricsJobListener() {
        return new JobExecutionListener() {

            private Timer.Sample jobTimer;

            @Override
            public void beforeJob(JobExecution jobExecution) {
                // ์žก ์†Œ์š” ์‹œ๊ฐ„ ํƒ€์ด๋จธ ์‹œ์ž‘
                jobTimer = Timer.start(meterRegistry);
                Counter.builder("batch.job.started")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .register(meterRegistry)
                        .increment();
            }

            @Override
            public void afterJob(JobExecution jobExecution) {
                // ์ด ์†Œ์š” ์‹œ๊ฐ„ ๊ธฐ๋ก
                jobTimer.stop(Timer.builder("batch.job.duration")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .tag("status", jobExecution.getStatus().toString())
                        .register(meterRegistry));

                // ์ƒํƒœ๋ณ„ ์žก ์นด์šดํ„ฐ
                Counter.builder("batch.job.completed")
                        .tag("job", jobExecution.getJobInstance().getJobName())
                        .tag("status", jobExecution.getStatus().toString())
                        .register(meterRegistry)
                        .increment();
            }
        };
    }

    @Bean
    public StepExecutionListener metricsStepListener() {
        return new StepExecutionListener() {

            @Override
            public void afterStep(StepExecution stepExecution) {
                String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
                String stepName = stepExecution.getStepName();

                // ์ฒ˜๋ฆฌ๋Ÿ‰ ๋ฉ”ํŠธ๋ฆญ
                Gauge.builder("batch.step.read.count", stepExecution, StepExecution::getReadCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                Gauge.builder("batch.step.write.count", stepExecution, StepExecution::getWriteCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                Gauge.builder("batch.step.skip.count", stepExecution, StepExecution::getSkipCount)
                        .tag("job", jobName)
                        .tag("step", stepName)
                        .register(meterRegistry);

                return null;
            }
        };
    }
}

์งˆ๋ฌธ 12: ํŒŒํ‹ฐ์…”๋‹์˜ ํ”ํ•œ ํ•จ์ •์€ ๋ฌด์—‡์ž…๋‹ˆ๊นŒ

์ž์ฃผ ๋ฐœ์ƒํ•˜๋Š” ์‹ค์ˆ˜์—๋Š” ๋ถˆ๊ท ํ˜•ํ•œ ํŒŒํ‹ฐ์…˜(ํ•œ ํŒŒํ‹ฐ์…˜์— ๋ฐ์ดํ„ฐ์˜ 90%๊ฐ€ ๋ชฐ๋ฆผ), thread-safeํ•˜์ง€ ์•Š์€ reader, ํŒŒํ‹ฐ์…˜ ๊ฐ„ ์ž˜๋ชป๋œ ์ƒํƒœ ๊ด€๋ฆฌ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.

BalancedPartitioner.javajava
// ๋ถ€ํ•˜๋ฅผ ์‹ค์ œ๋กœ ๊ท ํ˜• ์žก๋Š” Partitioner
@Component
public class BalancedPartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public BalancedPartitioner(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // ์ฒ˜๋ฆฌํ•  ์ „์ฒด ์•„์ดํ…œ ์ˆ˜ ๊ณ„์‚ฐ
        Integer totalCount = jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM orders WHERE status = 'PENDING'", Integer.class);

        if (totalCount == null || totalCount == 0) {
            return Map.of();
        }

        // ํŒŒํ‹ฐ์…˜๋‹น ๋ชฉํ‘œ ํฌ๊ธฐ ๊ณ„์‚ฐ
        int itemsPerPartition = (int) Math.ceil((double) totalCount / gridSize);

        Map<String, ExecutionContext> partitions = new HashMap<>();

        // ๊ท ํ˜• ์žกํžŒ ํŒŒํ‹ฐ์…˜์„ ์œ„ํ•ด OFFSET/LIMIT ์‚ฌ์šฉ
        // ๋ฒ”์œ„ ๋ถ„ํ• ๋ณด๋‹ค ๋น„์šฉ์€ ํฌ์ง€๋งŒ ๊ท ํ˜•์„ ๋ณด์žฅ
        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.putInt("offset", i * itemsPerPartition);
            context.putInt("limit", itemsPerPartition);
            context.putInt("partitionNumber", i);

            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

// OffsetBasedReader.java
// ์˜คํ”„์…‹ ๊ธฐ๋ฐ˜ ํŒŒํ‹ฐ์…”๋‹๊ณผ ํ˜ธํ™˜๋˜๋Š” Reader
@StepScope
@Component
public class OffsetBasedReader implements ItemReader<OrderRecord>, ItemStream {

    private final JdbcTemplate jdbcTemplate;
    private Iterator<OrderRecord> iterator;

    @Value("#{stepExecutionContext['offset']}")
    private int offset;

    @Value("#{stepExecutionContext['limit']}")
    private int limit;

    public OffsetBasedReader(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void open(ExecutionContext executionContext) {
        // ์ด ํŒŒํ‹ฐ์…˜์— ํ• ๋‹น๋œ ๋ถ€๋ถ„๋งŒ ์ •ํ™•ํžˆ ๋กœ๋”ฉ
        List<OrderRecord> records = jdbcTemplate.query(
                "SELECT * FROM orders WHERE status = 'PENDING' ORDER BY id LIMIT ? OFFSET ?",
                new OrderRecordRowMapper(),
                limit, offset
        );
        this.iterator = records.iterator();
    }

    @Override
    public OrderRecord read() {
        return iterator.hasNext() ? iterator.next() : null;
    }

    @Override
    public void update(ExecutionContext executionContext) {
        // ํ•„์š” ์‹œ ์žฌ์‹œ์ž‘์šฉ ์ƒํƒœ ์ €์žฅ
    }

    @Override
    public void close() {
        // ์ •๋ฆฌ
    }
}

์‹œ๋‹ˆ์–ด๋ฅผ ์œ„ํ•œ ๊ณ ๊ธ‰ ์งˆ๋ฌธ

์งˆ๋ฌธ 13: ์žก ๊ฐ„ ์˜์กด์„ฑ์„ ์–ด๋–ป๊ฒŒ ๊ด€๋ฆฌํ•ฉ๋‹ˆ๊นŒ

Spring Batch๋Š” ์žก ๊ฐ„ ์˜์กด์„ฑ์„ ์ž์ฒด์ ์œผ๋กœ ๊ด€๋ฆฌํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ์™ธ๋ถ€ ์˜ค์ผ€์ŠคํŠธ๋ ˆ์ดํ„ฐ(Airflow, Kubernetes CronJob)๋‚˜ JobExplorer๋ฅผ ํ™œ์šฉํ•œ ์ปค์Šคํ…€ ๊ตฌํ˜„์ด ํ•ด๋ฒ•์ž…๋‹ˆ๋‹ค.

JobDependencyService.javajava
// ์žก ๊ฐ„ ์˜์กด์„ฑ ๊ด€๋ฆฌ
@Service
public class JobDependencyService {

    private final JobExplorer jobExplorer;
    private final JobLauncher jobLauncher;
    private final Map<String, Job> jobs;

    public JobDependencyService(JobExplorer jobExplorer,
                                  JobLauncher jobLauncher,
                                  Map<String, Job> jobs) {
        this.jobExplorer = jobExplorer;
        this.jobLauncher = jobLauncher;
        this.jobs = jobs;
    }

    public JobExecution runWithDependencies(String jobName,
                                             JobParameters params,
                                             List<String> dependsOn) throws Exception {
        // ๋ชจ๋“  ์˜์กด์„ฑ์ด ์„ฑ๊ณตํ–ˆ๋Š”์ง€ ํ™•์ธ
        for (String dependency : dependsOn) {
            if (!hasSuccessfulExecution(dependency, params)) {
                throw new JobExecutionException(
                        "Dependency not satisfied: " + dependency);
            }
        }

        Job job = jobs.get(jobName);
        if (job == null) {
            throw new IllegalArgumentException("Unknown job: " + jobName);
        }

        return jobLauncher.run(job, params);
    }

    private boolean hasSuccessfulExecution(String jobName, JobParameters params) {
        // ๋™์ผํ•œ ๋น„์ฆˆ๋‹ˆ์Šค ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ COMPLETED์ธ ์‹คํ–‰ ๊ฒ€์ƒ‰
        return jobExplorer.findJobInstancesByJobName(jobName, 0, 1)
                .stream()
                .flatMap(instance -> jobExplorer.getJobExecutions(instance).stream())
                .filter(exec -> exec.getStatus() == BatchStatus.COMPLETED)
                .anyMatch(exec -> matchesBusinessParams(exec.getJobParameters(), params));
    }

    private boolean matchesBusinessParams(JobParameters actual, JobParameters expected) {
        // ๋น„์ฆˆ๋‹ˆ์Šค ํŒŒ๋ผ๋ฏธํ„ฐ ๋น„๊ต(์‹คํ–‰ ํƒ€์ž„์Šคํƒฌํ”„ ๋ฌด์‹œ)
        String actualDate = actual.getString("businessDate");
        String expectedDate = expected.getString("businessDate");
        return Objects.equals(actualDate, expectedDate);
    }
}

์งˆ๋ฌธ 14: Spring Batch ์žก์„ ์–ด๋–ป๊ฒŒ ํšจ๊ณผ์ ์œผ๋กœ ํ…Œ์ŠคํŠธํ•ฉ๋‹ˆ๊นŒ

Spring Batch ์žก ํ…Œ์ŠคํŠธ๋Š” ๊ณ„์ธต์  ์ ‘๊ทผ์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค. ์ปดํฌ๋„ŒํŠธ ๋‹จ์œ„ ์œ ๋‹› ํ…Œ์ŠคํŠธ(reader, processor, writer), Step์˜ ํ†ตํ•ฉ ํ…Œ์ŠคํŠธ, ์žก ์ „์ฒด์˜ ์ข…๋‹จ๊ฐ„ ํ…Œ์ŠคํŠธ์ž…๋‹ˆ๋‹ค.

OrderProcessorTest.javajava
// ํ”„๋กœ์„ธ์„œ ์œ ๋‹› ํ…Œ์ŠคํŠธ
@ExtendWith(MockitoExtension.class)
class OrderProcessorTest {

    @Mock
    private PricingService pricingService;

    @Mock
    private ValidationService validationService;

    @InjectMocks
    private OrderItemProcessor processor;

    @Test
    void shouldProcessValidOrder() {
        // Given
        OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
        when(validationService.isValid(input)).thenReturn(true);
        when(pricingService.calculatePrice(input)).thenReturn(new BigDecimal("12.50"));

        // When
        ProcessedOrder result = processor.process(input);

        // Then
        assertThat(result).isNotNull();
        assertThat(result.finalPrice()).isEqualTo(new BigDecimal("12.50"));
    }

    @Test
    void shouldFilterInvalidOrder() {
        // Given
        OrderRecord input = new OrderRecord(1L, 100L, BigDecimal.TEN);
        when(validationService.isValid(input)).thenReturn(false);

        // When
        ProcessedOrder result = processor.process(input);

        // Then - null์€ ํ•„ํ„ฐ๋ง๋จ์„ ์˜๋ฏธ
        assertThat(result).isNull();
        verify(pricingService, never()).calculatePrice(any());
    }
}
ImportJobIntegrationTest.javajava
// ์žก ์ „์ฒด์— ๋Œ€ํ•œ ํ†ตํ•ฉ ํ…Œ์ŠคํŠธ
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class ImportJobIntegrationTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JobRepositoryTestUtils jobRepositoryTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @BeforeEach
    void setup() {
        // ํ…Œ์ŠคํŠธ ์‚ฌ์ด์˜ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์ •๋ฆฌ
        jobRepositoryTestUtils.removeJobExecutions();
        // ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ ์ดˆ๊ธฐํ™”
        jdbcTemplate.execute("DELETE FROM processed_orders");
        jdbcTemplate.execute("DELETE FROM orders");
    }

    @Test
    void shouldCompleteJobSuccessfully() throws Exception {
        // Given - ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ
        insertTestOrders(100);

        // When
        JobParameters params = new JobParametersBuilder()
                .addLocalDate("businessDate", LocalDate.now())
                .addLong("run.id", System.currentTimeMillis())
                .toJobParameters();

        JobExecution execution = jobLauncherTestUtils.launchJob(params);

        // Then
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(countProcessedOrders()).isEqualTo(100);
    }

    @Test
    void shouldHandleEmptyDataset() throws Exception {
        // Given - ๋ฐ์ดํ„ฐ ์—†์Œ

        // When
        JobExecution execution = jobLauncherTestUtils.launchJob();

        // Then - ๋ฐ์ดํ„ฐ๊ฐ€ ์—†์–ด๋„ ์žก์€ ์„ฑ๊ณต
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }

    @Test
    void shouldRestartFromFailurePoint() throws Exception {
        // Given - ์ฒ˜๋ฆฌ ๋„์ค‘์˜ ์˜ค๋ฅ˜ ์‹œ๋ฎฌ๋ ˆ์ด์…˜
        insertTestOrders(100);
        insertPoisonOrder(50);  // ์˜ค๋ฅ˜ ๋ฐœ์ƒ

        // When - ์ฒซ ์‹คํ–‰ ์‹คํŒจ
        JobExecution firstExecution = jobLauncherTestUtils.launchJob();
        assertThat(firstExecution.getStatus()).isEqualTo(BatchStatus.FAILED);

        // ๋ฐ์ดํ„ฐ ์ˆ˜์ •
        removePoisonOrder(50);

        // When - ์žฌ์‹œ์ž‘
        JobExecution restartExecution = jobLauncherTestUtils.launchJob(
                firstExecution.getJobParameters());

        // Then - ์‹คํŒจ ์ง€์ ๋ถ€ํ„ฐ ์žฌ๊ฐœ
        assertThat(restartExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }

    private void insertTestOrders(int count) {
        for (int i = 1; i <= count; i++) {
            jdbcTemplate.update(
                    "INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
                    i, i * 10, BigDecimal.valueOf(i * 10));
        }
    }

    private int countProcessedOrders() {
        return jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM processed_orders", Integer.class);
    }
}

์งˆ๋ฌธ 15: ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์“ฐ๊ธฐ ์„ฑ๋Šฅ์„ ์–ด๋–ป๊ฒŒ ์ตœ์ ํ™”ํ•ฉ๋‹ˆ๊นŒ

์“ฐ๊ธฐ๋Š” ์ข…์ข… ๋ณ‘๋ชฉ์ด ๋ฉ๋‹ˆ๋‹ค. ์ตœ์ ํ™” ๋ฐฉ๋ฒ•์œผ๋กœ๋Š” JDBC ๋ฐฐ์น˜ ์ธ์„œํŠธ, ์ ์žฌ ๋™์•ˆ์˜ ์ œ์•ฝ ์กฐ๊ฑด ๋น„ํ™œ์„ฑํ™”, ์Šคํ…Œ์ด์ง• ํ…Œ์ด๋ธ” ํ™œ์šฉ์ด ์žˆ์Šต๋‹ˆ๋‹ค.

OptimizedJdbcWriter.javajava
// ๋Œ€์šฉ๋Ÿ‰์„ ์œ„ํ•œ ์ตœ์ ํ™”๋œ Writer
@Component
public class OptimizedJdbcWriter implements ItemWriter<ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;
    private final DataSource dataSource;

    public OptimizedJdbcWriter(JdbcTemplate jdbcTemplate, DataSource dataSource) {
        this.jdbcTemplate = jdbcTemplate;
        this.dataSource = dataSource;
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception {
        List<? extends ProcessedOrder> items = chunk.getItems();

        if (items.isEmpty()) {
            return;
        }

        // ๋ฐฐ์น˜์™€ ํ•จ๊ป˜ PreparedStatement ์‚ฌ์šฉ
        try (Connection connection = dataSource.getConnection();
             PreparedStatement ps = connection.prepareStatement(
                     "INSERT INTO processed_orders (order_id, customer_id, final_price, processed_at) " +
                             "VALUES (?, ?, ?, ?)")) {

            for (ProcessedOrder order : items) {
                ps.setLong(1, order.orderId());
                ps.setLong(2, order.customerId());
                ps.setBigDecimal(3, order.finalPrice());
                ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                ps.addBatch();
            }

            // ๋ชจ๋“  INSERT๋ฅผ ํ•œ ๋ฒˆ์˜ ๋„คํŠธ์›Œํฌ ํ˜ธ์ถœ๋กœ ์‹คํ–‰
            ps.executeBatch();
        }
    }
}

// StagingTableWriter.java
// ์ดˆ๋Œ€์šฉ๋Ÿ‰์„ ์œ„ํ•œ ์Šคํ…Œ์ด์ง• ํ…Œ์ด๋ธ” ํŒจํ„ด
@Component
public class StagingTableWriter implements ItemWriter<ProcessedOrder>, StepExecutionListener {

    private final JdbcTemplate jdbcTemplate;
    private String stagingTable;

    public StagingTableWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        // ์ด Step์„ ์œ„ํ•œ ์ž„์‹œ ํ…Œ์ด๋ธ” ์ƒ์„ฑ
        stagingTable = "staging_orders_" + stepExecution.getId();
        jdbcTemplate.execute(
                "CREATE TEMP TABLE " + stagingTable + " (LIKE processed_orders INCLUDING ALL)");
    }

    @Override
    public void write(Chunk<? extends ProcessedOrder> chunk) {
        // ์Šคํ…Œ์ด์ง• ํ…Œ์ด๋ธ”์— ๊ธฐ๋ก(FK ์ œ์•ฝ ์—†์Œ)
        String sql = "INSERT INTO " + stagingTable +
                " (order_id, customer_id, final_price, processed_at) VALUES (?, ?, ?, ?)";

        jdbcTemplate.batchUpdate(sql, chunk.getItems(), chunk.size(),
                (ps, order) -> {
                    ps.setLong(1, order.orderId());
                    ps.setLong(2, order.customerId());
                    ps.setBigDecimal(3, order.finalPrice());
                    ps.setTimestamp(4, Timestamp.valueOf(order.processedAt()));
                });
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
            // ์ตœ์ข… ํ…Œ์ด๋ธ”๋กœ ์ผ๊ด„ ๋ณต์‚ฌ
            jdbcTemplate.execute(
                    "INSERT INTO processed_orders SELECT * FROM " + stagingTable);
        }
        // ์Šคํ…Œ์ด์ง• ํ…Œ์ด๋ธ” ์ •๋ฆฌ
        jdbcTemplate.execute("DROP TABLE IF EXISTS " + stagingTable);
        return stepExecution.getExitStatus();
    }
}

๊ฒฐ๋ก 

๊ธฐ์ˆ  ๋ฉด์ ‘์—์„œ Spring Batch 5๋ฅผ ์ •๋ณตํ•˜๋ ค๋ฉด ๋‚ด๋ถ€ ๋ฉ”์ปค๋‹ˆ์ฆ˜์— ๋Œ€ํ•œ ๊นŠ์€ ์ดํ•ด๊ฐ€ ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค.

โœ… ์•„ํ‚คํ…์ฒ˜: Job โ†’ Step โ†’ Chunk(Reader, Processor, Writer)

โœ… ์ฒญํฌ ์ฒ˜๋ฆฌ: ํฌ๊ธฐ ์‚ฐ์ •, ๋ผ์ดํ”„์‚ฌ์ดํด, ํŠธ๋žœ์žญ์…˜

โœ… ํŒŒํ‹ฐ์…”๋‹: ๋กœ์ปฌ vs ์›๊ฒฉ, ํŒŒํ‹ฐ์…˜ ๊ท ํ˜•

โœ… ์žฅ์•  ํ—ˆ์šฉ: skip, retry, restart์— ์ ํ•ฉํ•œ ์ •์ฑ… ์ ์šฉ

โœ… ํ™•์žฅ: ๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋”ฉ, parallel steps, remote chunking

โœ… ํ…Œ์ŠคํŠธ: ์œ ๋‹›, ํ†ตํ•ฉ, ์ข…๋‹จ๊ฐ„

โœ… ์ตœ์ ํ™”: ๋ฐฐ์น˜ ์“ฐ๊ธฐ, ์Šคํ…Œ์ด์ง• ํ…Œ์ด๋ธ”, ๋ชจ๋‹ˆํ„ฐ๋ง

๊ณ ๊ธ‰ ์งˆ๋ฌธ์€ ๋ฐ์ดํ„ฐ ์–‘, ์‹œ๊ฐ„ ์ œ์•ฝ, ์˜ค๋ฅ˜ ํ—ˆ์šฉ ์ˆ˜์ค€, ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์ธํ”„๋ผ ๊ฐ™์€ ๋งฅ๋ฝ์— ๋”ฐ๋ผ ์•„ํ‚คํ…์ฒ˜ ๊ฒฐ์ •์„ ์ •๋‹นํ™”ํ•˜๋Š” ๋Šฅ๋ ฅ์„ ํ‰๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.

์—ฐ์Šต์„ ์‹œ์ž‘ํ•˜์„ธ์š”!

๋ฉด์ ‘ ์‹œ๋ฎฌ๋ ˆ์ดํ„ฐ์™€ ๊ธฐ์ˆ  ํ…Œ์ŠคํŠธ๋กœ ์ง€์‹์„ ํ…Œ์ŠคํŠธํ•˜์„ธ์š”.

ํƒœ๊ทธ

#spring batch
#spring boot
#java
#batch processing
#interview questions

๊ณต์œ 

๊ด€๋ จ ๊ธฐ์‚ฌ