Java/Project Reactor

Project Reactor의 Scheduler

민철킹 2023. 3. 8. 01:14

Scheduler

Project Reactor에서 SchedulerThread를 관리하는 역할을 담당한다.

Project Reactor는 동시성에 구애받지 않고 개발자가 이를 직접 동제할 수 있다.

개발자가 이를 직접 통제할 수 있도록, 복잡한 멀티쓰레딩을 손쉽게 사용할 수 있도록 Scheduler가 도와준다.

Project Reactor는 일반적으로 호출된, 즉 subscribe()가 호출된 쓰레드에서 동작을 수행한다.

🤔무슨의미?

스크린샷 2023-03-07 오후 11 25 43

위 사진과 같은 상황에서 새로운 쓰레드에서 subscribe()를 호출하고 있기 떄문에 메인 쓰레드가 아닌 호출된 새로운 쓰레드에서 동작을 수행한다.

 

 


 

 

Scheduler의 종류

reactor.core.scheduler 하위에는 Scheduler 인터페이스를 구현한 여러 구현체들이 존재한다.

 

추상 클래스인 Schedulers에서 제공하는 팩토리 메서드를 사용해 여러 구현체들을 손쉽게 생성하고 사용할 수 있다.

 

1. ImmediateScheduler

이는 현재 쓰레드에서 작업을 즉시 실행하는 Scheduler이다.(Runnable이 바로 실행됨)

Schedulers.immediate()를 사용해 생성할 수 있다.

 

작업이 즉시 실행이 되므로 지연시간이 없는 빠른 반응성을 제공하는 특징을 가지고 있다.

 

또한, 쓰레드 풀이나 큐를 별도로 사용하지 않고 있기 때문에 성능상의 이점이 있다.

 

하지만 동시성이 고려되지 않은 Scheduler이므로 작업이 실행되는 동안 Blocking이 될 수 있어, 작업 시간이 길다면 사용을 지양하는 것이 좋다.

 

 

2. SingleScheduler

싱글 쓰레드를 사용하여 작업을 처리하는 Scheduler이다.

Schedulers.single()을 사용해 생성할 수 있다.

 

SingleScheduler는 내부적으로 쓰레드를 캐시하지 않기 때문에 매번 호출할 때마다 새로운 쓰레드를 생성한다.

쓰레드를 하나를 사용하기 때문에 동시성을 보장할 수 있고 이를 위해 내부적으로 쓰레드를 관리한다.(순차적 실행)

하나의 쓰레드를 사용하기 때문에 위와 마찬가지로 별도의 쓰레드 풀이나 큐를 사용하지 않는다.

 

모든 작업을 하나의 쓰레드에서 처리하기 때문에 작업양이 많은 경우에는 지양하는 것이 좋다.

순차적으로 처리해야하는 비동기 작업(네트워크, DB 연결)과 같은 작업에 유용하게 사용할 수 있다.

 

호출할 때마다 해당 작업에 대한 전용 쓰레드를 사용하고 싶다면 Schedulers.newSingle()를 통해 싱글 쓰레드를 여러개 만들 수 있다.

 

SingleSchedulerFluxSink와 함께 사용하여 비동기 작업을 처리한다.

FluxSinkSubscriber와 비슷한 역할을 한다.

 

엥? 공식문서에는 Schedulers.single()를 사용하면 동일한 쓰레드를 재사용할 수 있다는데요?

 

SingleScheduler는 내부적으로 캐시를 하지 않기 때문에 매번 새로운 단일 쓰레드를 생성한다.

 

하지만 팩토리 메서드가 존재하는 Schedulers가 내부적으로 쓰레드를 캐시하기 때문에 Schedulers.single()의 반환값이 SingleScheduler이지만 매번 동일한 쓰레드를 재사용할 수 있다.

 

 

3. SingleWorkerScheduler

SingleWorkerSchedulerSingleScheduler와 유사하다.

하지만 SingleWorkerScheduler는 내부적으로 쓰레드를 캐시하기 때문에 한번 생성된 뒤에는 매번 똑같은 쓰레드를 재사용한다.

 

SingleWorkerSchedulerSchedulerProcessorPublisher와 함께 사용하여 작업을 처리한다.

Publisher가 작업을 Processor에게 전달하고 이 작업을 단일 쓰레드에서 처리한다.

 

따라서, SingleScheduler보다는 더 많은 작업량을 처리할 수 있다.

 

 

4. ElasticScheduler

쓰레드 풀을 사용해 작업을 처리하는 Scheduler이다.

Schedulers.elastic()을 사용해 생성할 수 있다.

 

쓰레드 풀을 기반으로 하기 때문에 작업량에 따라 쓰레드의 개수를 동적으로 조정할 수 있다.

제일 처음 초기화될 시점에 쓰레드 풀의 쓰레드 개수는 0개이다.

 

따라서, 작업량에 따라 최적의 쓰레딩을 유지하므로 높은 처리량성능을 보장한다.

일반적으로, 비동기 처리나 작업이 긴 경우에 유용하게 사용할 수 있다.

 

하지만, 쓰레드에 대한 제한이 없어 요청시마다 쓰레드를 생성하는 문제가 있어 Backpressure 전략을 제대로 지원하지 못해 BoundedElasticScheduler로 대체된다.

 

 

5. BoundedElasticScheduler

ElasticScheduler와 매우 유사하다.

Schedulers.boundedElastic()을 사용해 생성할 수 있다.

 

ElasticScheduler와의 차이점으로는 유휴시간이 긴 쓰레드가 있다면 폐기하며 쓰레드 풀의 최대 쓰레드의 수를 제한한다.

기본값은 유휴시간은 60초, 쓰레드 수 제한은 CPU 코어 X 10이다.

 

쓰레드 수가 최대치에 도달한뒤의 작업은 100,000개 까지 큐에 담고 작업 가능한 쓰레드가 생긴다면 재스케쥴링을 통해 작업을 처리한다.

너무 많은 작업이 동시에 처리되는 것을 방지하여 작업의 안정성을 높이기 위한 전략이다.

 

처리량도 높고 안정성도 높기 때문에 대규모의 동시성 작업(HTTP Request, DB Query)에 유용하게 사용할 수 있다.

 

 

6. ParallelScheduler

다중 스레드를 사용해 작업을 병렬로 처리하는 Scheduler이다.

이는 내부적으로 고정된 쓰레드 풀을 사용한다.

 

ParallelScheduler는 데이터를 작은 여러개의 청크로 쪼개고 해당 청크를 병렬로 처리하고 다시 합치는 방식으로 작업을 처리한다.

 

기본적으로 CPU 코어 수만큼의 쓰레드를 생성한다.

CPU 코어 수만큼 쓰레드를 생성하기 때문에 작업양이 커 CPU를 많이 사용하더라도 최대한의 성능을 보장한다.

 

Reactive StreamBackpressure기능과 함께 사용될 때 더욱 안정된 작업을 보장할 수 있다.

ParallelScheduler는 대량의 데이터를 처리해야할 때 유용하다.(DB Query, 대규모의 파일 등)

 

 

7. DelegateServiceScheduler

이름에서 알 수 있듯이 DelegateServiceScheduler는 다른 Scheduler 래핑하여 스케쥴링을 위임한다.

생성자를 통해 다른 Scheduler를 인자로 받고 해당 Scheduler의 기능을 그대로 사용한다.

부가적인 기능(로깅, 예외처리)을 추가할 수 있기 때문에 보다 유연하게 사용할 수 있다.

 

 

8. ExecutorScheduler

Executor를 기반으로 동작하는 Scheduler이다.

Executor를 인자로 받아 생성된다.

public interface Executor {
    void excute(Runnable command);
}

// Executor는 java.util.concurrent 하위에 존재하는 쓰레드를 관리하는 인터페이스로써 쓰레드를 생성하고 쓰레드 풀을 만들고 쓰레드에 작업을 할당하는 역할을 한다.

// 비동기 작업 처리를 위한 인터페이스로써, `Runnable`을 구현한 객체를 실행하는 객체가 구현해야하는 인터페이스이다.

while(true){
  Request request = acceptRequest();
  Runnable requestHAndler = new RequestHandler(request);
  new Thread(requestHandler).start();
}

// 위와 같은 코드에서 요청시마다 새로운 쓰레드를 생성하고 종료한다면 많은 오버헤드가 발생하고 JVM에 부하를 주게되어 메모리 누수가 발생할 수 있다.

// 이를 Executor를 구현한 객체를 통해 해결할 수 있다.

Executor executor = new Executor();

while(true) {
    Request request = acceptRequest();
    Runnable requestHandler = new RequestHandler(request);
    executor.excute(requestHandler);
}

따라서 ExecutorSchedulerExecutor를 사용해 작업을 처리하고, Executor는 쓰레드 풀을 제공한다.

Executor의 종류에 관계없기 때문에 기존에 존재하던 것을 래핑해 재사용할 수 있다.

 

반응형