TroubleShooting

Spring Batch 성능 개선기(병렬처리)

민철킹 2024. 4. 5. 20:27

최근 회사에서 프로젝트를 진행하며 사용했던 Spring Batch 성능 개선기를 기록해보려합니다.

먼저, 간략하게 배치가 어떤 흐름으로 동작하는지를 알아보면

 

1. 대상 상품 조회(Reader)대상
2. 상품중 비즈니스 로직으로 필터링(Processor)
3. api 호출 및 결과 db 저장(Writer)

 

와 같이 동작하고 있습니다.

 


 

1. 병목 지점 찾기

위 프로세스로 동작하는 프로토타입 레벨의 배치는 대상 상품 30 ~ 40만개 기준 4시간 ~ 6시간 정도가 소요되고 있었습니다.

 

별도의 커스텀한 설정을 따로 하지 않았기 때문에 싱글 쓰레드 기반으로 동작할 것이고 하나씩 순차적으로 처리되고 있습니다.

 

 

step을 분리하여 병목지점을 분석한 결과는 다음과 같은데요.

  • Mongo DB 단건 저장
  • 3번의 api call

 


 

2. Bulk Insert로 변경

Bulk Insert를 위해 stream문 안에서 xxxMongoRepository.save()를 하고 있던 방식을 stream 밖에서 xxxMongoRepository.saveAll()과 같이 변경하였습니다.

 

 

잠깐 savesaveAll은 어떤 차이가 있기에 Bulk Insert로 동작하는지를 알아보면

saveAll은 먼저 주어진 엔티티들이 모두 새로운 엔티티인지를 검사합니다.

 

 

 

만약 모두 새로운 엔티티라면 MongoOperationsinsert를 호출하는데요. 이 구현체를 살펴보면 아래와 같이 doInsertBatch를 통해 실제 Bulk Insert를 진행하고 있습니다.

 

 

(참고로, MongoRepository가 제공하는 insert를 호출할 시에 이미 id가 존재한다면 예외가 발생합니다. save는 위와같이 새로운 엔티티인지를 판단하고 upsert를 하기 때문에 예외가 발생하지 않습니다)

 

 

Bulk Insert를 통해 DB Network IO를 많이 줄여 눈에띄는 성능 변화를 기대했지만 실제론 굉장히 미비했습니다.

 

 

그 이유는 allNew에 있는데요.

 

배치의 대상이 되는 상품 특성상 대부분 새로운 엔티티보다는 이미 저장되어 수정되는 엔티티였기에 saveAll을 호출했지만 실제론 if문을 만족하지 못하여 stream을 돌며 save()를 호출하고 있었던 것 입니다.

 

 

따지고보면 WriterStream에서 Chunk Size만큼 한번 그 다음 saveAllStream에서 Chunk Size만큼 동일하게 2번 돌았다고 볼 수 있겠죠.

 

 


 

 

3. api call 병렬처리

 

사실 가장 명확한 병목지점이자 성능 저하의 주범이 되고 있었던 3번의 api call 개선하는 것이 가장 크게 성능을 향상시킬 수 있는 지점이라고 생각했습니다.

 

 

api call 하는 지점을 kafka를 통해 이벤트 비동기식으로 처리도 고려해보았지만 모든 api call동기식으로 처리되고 다음 프로세스가 진행되어야 하는 비지니스 로직 특성상 단순히 쓰레드 풀을 활용한 병렬처리가 적용가능한 가장 확실한 방법이라고 생각했습니다.

 

 

또한, 프로젝트의 특성상 awsidc를 api를 통해 왔다갔다해야만 했기에 api call을 통한 요청과 응답이 최우선적으로 고려되었던 것 같습니다.

 

 

3개의 api는 아래와 같습니다.

  • 내부 msa 기반 api 2개
  • 외부 솔루션 업체 api 1개

 

외부 솔루션 업체 api는 해당 업체에서 받을 수 있는 트래픽 자체가 정해져 있기 때문에 병렬처리를 하더라도 일정 수준을 넘어가면 모두 500 에러를 보내고 있었습니다.

 

 

따라서 하나의 writer에서 처리되던 3개의 api call을 2개의 Step으로 분리하고 내부 api call만 쓰레드 풀을 활용한 병렬처리를 통해 성능을 개선하려 하였습니다.

 

 

Spring Batch에서는 아래와 같이 step을 만드는 시점에 굉장히 간단하게 taskExecutor를 통해 병렬처리가 가능해집니다.

 

 

 

ThreadPoolTaskExecutor를 사용하면 아래와 같이 쓰레드 풀을 활용해 여러 쓰레드가 step을 병렬로 처리하며 작업할 수 있게 됩니다.

 

 

 

여기서 잠깐 ThreadPoolTaskExecutor에 대해 간단하게 알아보자면

 

Executor가 생성되는 시점에 BlockingQueue를 생성합니다.

 

여기에는 스레드가 처리할 각 개별 task가 저장됩니다.

이를 각 개별 쓰레드에서 꺼내에 Runnable을 통해 실행합니다.

 

 

 

이 큐에는 용량이 존재하는데요(queueCapacity, 디폴트는 Interger.MAX_VALUE)

만약 용량이 가득 찼다면 큐에 용량이 생길 때까지 적재하지 않고 블락킹됩니다.
이를 통해 제한된 리소스 내에서 안전하게 처리가 가능해집니다.

 

 

 

또한, 제가 설정한 옵션 외에도 지원하는 여러 다양한 옵션들이 있기에 필요에따라 커스텀하게 설정하면 될 듯 합니다.

 

기본 쓰레드 풀과 최대 쓰레드 풀 사이즈를 설정할 수 있는데 이는 아래와 같이 내부적으로 동작합니다.

 

기본 사이즈가 10, 최대가 20이라고 가정해보겠습니다.

최초 작업 생성시 쓰레드 풀의 쓰레드는 10개가 생성됩니다. 이 쓰레드들은 BlockingQueue에서 작업을 꺼내와 처리한다고 이야기했는데요.

 

최초 생성된 쓰레드만으로 100% 처리할 수 없는 경우, 즉 큐에 계속 작업이 쌓여 밀리고 있는 경우 ThreadPoolTaskExecutor는 새로운 스레드를 추가하여 최대 쓰레드 풀 사이즈까지 스레드를 생성합니다.

 

만약 최대로 생성하였는데도 queue에 작업이 계속 쌓인다면 기본적으로는 작업이 대기상태가 됩니다.

 

 

 

ThreadPoolTaskExecutor에 대한 기본적인 설명은 여기까지로 다시 배치 이야기로 돌아가보겠습니다.

 

이렇게 TaskExecutor를 활용해 내부 api 호출만 10코어로 병렬처리하도록 만들고 테스트를 진행했습니다.

 

 

결과는?..

 

 

 

대상 데이터가 약 30~40만개였다고 처음에 말씀드렸는데요.

 

배치에서 읽어들인 데이터가 이를 훨씬 초과했습니다. 중복되서 데이터가 처리되고 있다는 뜻입니다.

 

 

원인은 Reader에 있었는데요.

 

대상 상품수가 그렇게 대량도 아니고 단순 속도면에서도 Cursor 방식이 Paging 방식보다는 뛰어나기 때문에 저는 CursorItemReader 를 사용하고 있었습니다.

 

 

CursorItemReader는 사실 Thread-safe 하지 않은데요.

따라서, Thread-safe하지 않은 Reader를 사용하면서 멀티 쓰레딩을 사용하고 있었기 때문에 reader가 데이터를 중복해서 읽고 이를 중복해서 처리하고 있었습니다.

 

CursorItemReader를 단순히 PagingItemReader로 변경하는 것만으로 이 문제는 해결될 수 있는데요.(PagingItemReader는 Thread-safe 합니다)

 

저는 조금 다른 방식을 사용하였습니다.

 

 

SynchronizedItemStreamReader를 활용하는 건데요. 이 방식을 사용하면 기존 코드의 변경도 거의 없이 굉장히 간단하게 Non Thread safe을 Thread safe하게 변경할 수 있습니다.

끝입니다.

 

SynchronizedItemStreamReader로 기존에 사용중이던 Reader를 한번 컨버팅하는 것만으로 제 CursorItemReader는 Thread-safe 해졌습니다.

 

 

 

 

내부 구현을 보면 내부 delegate(내부에 있는 실제 reader)를 호출 메서드에 synchronized 키워드를 사용하고 있는 것을 확인할 수 있습니다.

 

그래서 성능 개선이 됐을까요?

 

결론부터 말씀드리면 성능 개선에 엄청한 효과가 있었습니다.

기존 평균 4~6시간이 소요되던 배치를 step으로 병목지점을 분리하고 병렬처리를 통해 개선한 뒤 1시간 내외로 수행되고 있습니다.

 

 

이는 쓰레드풀 사이즈가 10인 기준이고 서버의 리소스가 충분해 사이즈를 늘린다면 훨씬 더 빠르게 처리할 수 있겠죠.

 

 


 

 

4. 마치며

 

병목지점을 찾고 무언가를 개선하는 일은 개인적으로 프로그래밍에 있어 가장 흥미를 느끼게 되는 요소인듯 합니다.

 

제가 사용한 방법이 정답은 아니기에 시스템의 성격이나 케이스에 맞게 여러 방법을 적용하면 될 것 같습니다.

 

 

 

단순히 병렬 처리가 정답은 아니지만 이 방식을 사용하게 된다면 주의할 점을 몇가지 말씀드리면서 글을 마무리하려합니다.

  • 서버의 리소스
    • 본인이 가용할 수 있는 리소스를 파악하는 것이 중요합니다.
    • 서버의 리소스를 소비해 처리 속도를 높이는 방식이다보니 서버에 부하를 줄 수 있습니다.
    • 이는 db 또는 저와 같이 외부 api를 호출하는 경우라면 해당 시스템에도 부하를 줄 수 있기 때문입니다.
    • 호출되는 곳이 병렬처리를 통해 평소보다 많은 트래픽을 받아 부하를 받아 문제가 되진 않는지 확인이 먼저 필요합니다.

 

  • thread safe
    • 병렬 처리이다보니 thread safe한지가 매우 중요합니다.
    • 그렇지 않으면 원하지 않는 방향으로 배치가 동작할 수 있습니다.

 

 

  •  재시작
    • 병렬 처리를 하면 어느 쓰레드가 어디까지 수행했는지를 판단하기가 어렵습니다
    • 그래서 Spring Batch가 제공하는 기능중 실패한 지점부터 Job을 재시작하는 기능을 사용할 수 없습니다.

 

 

  • 트랜잭션
    • 각각의 쓰레드의 트랜잭션이 별도로 존재하고 같은 Job이지만 각각의 쓰레드는 독립적인 Writer나 Processor를 가지고 있기 때문에 트랜잭션이 중요하다면 트랜잭션을 적절히 관리해야합니다.
반응형

'TroubleShooting' 카테고리의 다른 글

모니터링 습관화하기  (2) 2024.01.11