[Spring/Batch] HTTP 요청으로 실행되는 배치 만들기

[Spring/Batch] HTTP 요청으로 실행되는 배치 만들기

Spring Batch는 대규모 데이터를 일괄(batch)처리 할 때 유용한 프레임워크 이다. 이 글에서는 Spring Batch에 대해서 간단하게 알아보고, API 호출로 Batch를 실행하는 방법과 고려해야 할 사항을 알아보자.

Spring Batch는 Java로 배치 처리 애플리케이션을 구축 할 때 사용하는 아주 강력한 프레임워크이다. 대량의 데이터를 처리하거나 복잡한 비즈니스 프로세스를 자동화 하는 경우 유용하게 사용 할 수 있다.

Spring Batch는 Spring Framework로 구축되어 작업 관리, 데이터 처리, 오류 처리일반적으로 배치 처리를 할 때 사용되는 강력한 기능들을 제공하기 때문에 시간이 오래 소요되는 데이터 처리 작업은 Spring Batch를 사용하는 것이 좋다.

일반적으로 Spring Batch는 특정 시간에 자동으로 실행 될 수 있도록 스케줄러와 연결하여 사용하는데, 사용자의 요청으로 배치를 실행해야 하는 경우가 있다.

이 글에서는 Spring Batch의 구현과 사용자 요청으로 Batch를 실행하는 과정, 그리고 고려해야 할 사항을 설명한다.

이 글은 Spring Boot 3.0.4 (Spring Framework 6.0.6, Spring Batch 5.0.1) 기준으로 작성되었으므로, 이전 버전과는 다소 차이가 있을 수 있다.

1. Spring Batch의 구조

Spring Batch를 구현하기에 앞서 구조를 먼저 이해해야 한다.

graph TD
    A[Job] --> B[Step 1]
    A --> C[Step 2]
    A --> D[Step 3]
    B --> E[(Reader)]
    B --> F[(Processor)]
    B --> G[(Writer)]
    C --> H[(Reader)]
    C --> I[(Processor)]
    C --> J[(Writer)]
    D --> K[(Tasklet)]

그림 1. Spring Batch의 구조

A. Job

Spring Batch는 배치 작업 단위를 Job으로 표현한다. 위의 그림 1 처럼 하나의 Job은 여러개의 Step로 구성될 수 있다. Job에는 실행 할 때의 설정값과 상태 정보를 가지고 있다.

B. Step

Job 내에서의 구체적인 작업의 단위이다. 각 Step은 하나 이상의 Task로 구성 할 수 있으며, 순차적으로 실행된다.

C. Task

Task는 Step 내에서 단일 작업 단위를 나타낸다. 각 Task는 Reader, Processor, Writer 중 하나의 역할을 수행하거나 Tasklet으로 복잡한 작업을 수행하도록 구현 할 수 있다.

D. Chunk

그림 1에는 표현되어 있지 않지만 Chunk는 Task를 구성하는 데이터의 덩어리이다. 하나의 Chunk 내에서는 일괄 처리 방식으로 일정한 레코드 수만큼 처리된다. Chunk의 크기는 작업의 복잡성과 데이터의 크기 등을 고려해서 설정해야 한다.

2. Spring Batch 시작하기

A. 라이브러리 의존성 추가하기

Spring Boot에서 Spring Batch를 사용하기 위해서는 spring-boot-starter-batch가 필요하다. 아래 코드와 같이 라이브러리 의존성을 추가 할 수 있다.

// file: 'build.gradle.kts'
...중략

dependencies {

    ...중략

    implementation("org.springframework.boot:spring-boot-starter-batch")
    testImplementation("org.springframework.batch:spring-batch-test")

    ...중략
  
}

...중략

B. Spring Batch 테이블 초기화

Spring Batch의 실행 이력과 실행 정보를 저장하기 위해 데이터베이스 스키마를 초기화해야 한다. 아래 코드와 같이 spring.batch.jdbc.initialize-schema Property를 always로 설정하거나, org.springframework.batch.core 패키지 내의 schema-[DB플랫폼].sql을 직접 실행하여 초기화 할 수 있다.

# file: 'application.yml'
spring:
  batch:
    jdbc:
      initialize-schema: always

3. Spring Batch 사용하기

A. Job, Step, Task 정의

아래 코드는 Configuration에 Job, Step, 그리고 Task를 정의한 코드이다. 일반적으로 Job, Step, Task는 Bean으로 정의한다.

// file: 'MyBatchConfig.kt'
@Configuration
class MyBatchConfig(
    private val jobRepository: JobRepository,
    private val platformTransactionManager: PlatformTransactionManager,
    private val entityManagerFactory: EntityManagerFactory,
    private val taskExecutor: AsyncTaskExecutor,
) {

    @Bean
    fun myJob() = JobBuilder("MyJob", jobRepository)
        // 처음에 실행 할 Step
        .start(myStep1())
        // 다음에 실행 할 Step
        .next(myStep2())
        .build()

    /**
     * Reader, Processor, Writer를 사용하는 Step
     */
    @Bean
    fun myStep1() = StepBuilder("MyStep1", jobRepository)
        // chunk size는 10으로 설정
        .chunk<InputData, OutputData>(10, platformTransactionManager)
        // 병렬처리를 원한다면 taskExecutor를 설정
        .taskExecutor(taskExecutor)
        .reader(inputDataReader())
        .processor(inputDataToOutputDataProcessor())
        .writer(outputDataWriter())
        .build()

    /**
     * Tasklet을 사용하는 Step
     */
    @Bean
    fun myStep2() = StepBuilder("MyStep2", jobRepository)
        .tasklet(myStep2Tasklet(), platformTransactionManager)
        .build()

    /**
     * JpaPagingItemReader를 사용하여 데이터를 읽어옴
     */
    @Bean
    fun inputDataReader() = JpaPagingItemReaderBuilder<InputData>()
        .name("inputDataItemReader")
        .entityManagerFactory(entityManagerFactory)
        .pageSize(10)
        .queryString("select i from InputData i")
        .build()

    /**
     * ItemProcessor를 사용하여 데이터를 가공
     */
    @Bean
    fun inputDataToOutputDataProcessor() = ItemProcessor<InputData, OutputData> { item ->
        // Do something
        // null 반환시 writer에 전달하지 않음
        OutputData()
    }

    /**
     * JpaItemWriter를 사용하여 데이터를 저장
     */
    @Bean
    fun outputDataWriter() = JpaItemWriterBuilder<OutputData>()
        .entityManagerFactory(entityManagerFactory)
        .build()

    /**
     * Tasklet 정의
     */
    @Bean
    fun myStep2Tasklet() = Tasklet { contribution, chunkContext ->
        // Do something
        RepeatStatus.FINISHED
    }

}

Spring Batch 버전이 5.0이 되면서 JobBuilderFactoryStepBuilderFactory가 Deprecated 되었다. 이제 위의 코드처럼 JobBuilderStepBuilder를 이용해 Job과 Step을 정의해야 한다.

B. Job 자동 실행 방지

Spring Boot를 사용하는 경우 애플리케이션을 실행하면 Bean으로 등록된 모든 Job이 자동으로 실행된다. 자동 실행을 방지하려면 spring.batch.job.enabled Property를 false로 설정해야 한다.

# file: 'application.yml'
spring:
  batch:
    job:
      enabled: false

C. API로 Job 실행하기

JobLauncher를 이용하면 런타임에 Job을 실행 할 수 있다. JobLauncher는 Bean으로 주입받아 사용할 수 있다. JobExecution run(Job job, JobParameter parameter)메소드를 호출하면 Job이 실행된다. 아래 코드는 Controller와 Service 코드의 예시이다.

// file: 'MyController.kt'
@RestController
@RequestMapping("/api/v1/my/batch")
class MyController(
    private val myService: MyService
) {

    @PostMapping
    fun runBatch(
        @RequestBody parameter: MyBatchParameters
    ) = myService.runMyBatch(parameter)

}
// file: 'MyService.kt'
@Service
class MyService(
    private val jobLauncher: JobLauncher,
    private val myJob: Job,
) {

    fun runMyBatch(parameter: MyBatchParameters) {
        // JobParameter 설정
        JobParametersBuilder().apply {
            addString("username", parameter.username)
            addLocalDate("date", LocalDate.now())
        }.toJobParameters().let { parameters ->
            // 배치 실행
            jobLauncher.run(myJob, parameters)
        }
    }

}

4. API로 Job 실행시 고려해야 할 사항

A. 비동기 처리

기본적으로 JobLauncher를 통해 Job을 실행하면 동기적으로 실행된다. 일반적으로 Batch 작업은 대용량 데이터를 처리하기 때문에 오랫동안 실행된다. 따라서 Job의 실행이 길다면 작업이 완료 될 때까지 요청에 대한 응답을 받을 수 없다. 따라서 API로 Job을 실행하는 경우에 비동기로 실행해야 한다. TaskExecutorJobLauncher를 이용하면 Job을 비동기로 실행 할 수 있다.

아래 코드처럼 TaskExecutorJobLauncher를 생성하고, TaskExecutor를 설정해야한다. 그리고 Job을 실행 할 때, 생성한 TaskExecutorJobLauncher로 실행하면 비동기적으로 실행된다.

아래 코드는 비동기로 Job을 실행하는 예시이다.

// file: 'BatchConfig.kt'
@Configuration
class BatchConfig(
    private val taskExecutor: TaskExecutor,
    private val jobRepository: JobRepository,
) {

    ...중략

    @Bean
    fun taskExecutorJobLauncher() = TaskExecutorJobLauncher()
        .apply {
            setTaskExecutor(taskExecutor)
            setJobRepository(jobRepository)
        }

    ...중략

}
// file: 'MyService.kt'
@Service
class MyService(
    // TaskExecutorJobLauncher 주입
    private val taskExecutorJobLauncher: TaskExecutorJobLauncher,
    private val myJob: Job,
) {

    fun runMyBatch(parameter: MyBatchParameters) {
        // JobParameter 설정
        JobParametersBuilder().apply {
            addString("username", parameter.username)
            addLocalDate("date", LocalDate.now())
        }.toJobParameters().let { parameters ->
            // Job 실행
            taskExecutorJobLauncher.run(myJob, parameters)
        }
    }

}

B. JobInstance의 중복 방지

같은 JobInstance가 중복 생성되지 않도록 해야한다. JobInstance가 중복되면, 같은 작업을 중복하여 처리할 수 있기 때문에 데이터 불일치 등의 문제가 발생할 수 있다. Job을 실행하기 전에 JobInstance의 실행 여부를 체크 하거나, JobParameter를 적절하게 사용하면 JobInstance의 중복 생성을 방지 할 수 있다.

JobInstance는 Job + JobParameter의 조합으로 Job을 실행할 때 생성된다. Job의 실행 정보가 저장되어 있으며, 기본적으로 동일한 JobParameter로 JobInstance가 중복 생성되지 않는다.

5. 나의 사례

서비스에 타 쇼핑몰의 상품을 가져와 저장하는 기능의 개발이 필요했다. 이 기능을 Spring Batch로 구현한 이유와, API로 Job을 실행한 이유는 아래와 같다.

A. Spring Batch로 구현한 이유

타 쇼핑몰의 상품을 우리 서비스에 저장하는데 상품 데이터 fetch, 가공, 검색엔진 저장 등 일련의 과정들이 필요한데, 많은 수의 상품을 앞의 과정에 따라 처리 하면 많은 시간이 필요 할 것이라 판단했다. 따라서 안정적으로 데이터를 처리하기 위해 Spring Batch로 구현했다.

B. API로 Job을 실행한 이유

상품 정보를 가져오기 전에 관리자가 수동으로 해야 하는 작업이 있기 때문에 수동 작업이 완료된 후 임의로 상품을 가져 올 수 있도록 개발해야 했다. 또, 한 번만 타 쇼핑몰의 상품들을 가져오면 이후에는 WebHook으로 상품 정보를 보내주기 때문에, 특별한 경우가 아니면 Batch는 단 한 번만 실행하면 된다.

관리자가 해야 할 수동 작업이란, 타 쇼핑몰과 우리 쇼핑몰의 카테고리 분류 체계가 다르기 때문에 타 쇼핑몰의 카테고리 정보를 fetch 후 서비스의 카테고리 정보와 수동으로 맵핑해주는 작업이 필요했다.

참고 및 출처