Reactive Programming
Flux
Flux는 Reactive Streams에서 정의한 Pulisher 구현체이다.
0~N개의 테이터를 발행할 수 있고 하나의 데이터를 전달할 때마다 onNext 이벤트를 발생시킨다.
모든 데이터가 전달 완료되면 onComplete 이벤트가 발생한다.
중간에 에러가 발생하면 onError 이벤트가 발생한다.
non-blocking 상태의 Flux는 for문을 통해 하나씩 꺼내어 사용하지 않고 만들 수 있다.
just 메서드를 사용해 Flux객체를 만들 수 있다. subscribe 메서드를 사용해 stream형식으로 데이터가 흐르도록한다.
range 메서드를 통해 해당 범위의 숫자를 담는 Flux 객체를 만들 수 있다.
fromArray를 사용해 위와 같은 새로운 배열을 Flux객체로 만들 수도 있다.
즉, 이런 식의 코드도 가능하다.
핵심은 blocking 방식의 객체를 non-blocking 객체로 바꿀 수 있다라는 점이다.
fromIterable을 사용해 Iterable을 구현한 List와 같은 객체를 Flux객체로 만들 수 있다.
또한 자바8 이후로 굉장히 많이 사용되는 stream을 Flux객체로 바꿀 수도 있는데 위 상태에서는 현재 List이기 때문에 컴파일 오류가 발생한다. 따라서 .stream()을 추가해 stream으로 바꿔주면된다.
구구단 3단을 만드는 예시이다.
어떤 객체에 이벤트가 발생했을 때 어떤 처리를 하도록 만들 수 있음.
Flux.create()와 배압
- Subscriber로부터 요청이 왔을 때(FluxSink#onRequest) 데이터를 전송하거나(pull 방식)
- Subscriber의 요청에 상관없이 데이터를 전송하거나(push 방식)
- 두 방식 모두 Subscriber가 요청한 개수보다 더 많은 데이터를 발생할 수 있다.
- 이 코드는 Subscriber가 요청한 개수보다 3개 데이터를 더 발생한다. 이 경우 어떻게 될까?
- 기본적으로 Flux.create()로 생성한 Flux는 초과로 발생한 데이터를 버퍼에 보관한다.
- 버퍼에 보관된 데이터는 다음에 Subscriber가 데이터를 요청할 때 전달된다.
- 요청보다 발생한 데이터가 많을 때 선택할 수 있는 처리 방식은 다음과 같다.
- IGNORE : Subscriber의 요청 무시하고 발생(Subscriber의 큐가 다 차면 IllegalStateException 발생)
- ERROR : 익셉션(IllegalStateException) 발생
- DROP : Subscriber가 데이터를 받을 준비가 안 되어 있으면 데이터 발생 누락
- LATEST : 마지막 신호만 Subscriber에 전달
- BUFFER : 버퍼에 저장했다가 Subscriber 요청시 전달. 버퍼 제한이 없으므로 OutOfMemoryError 발생 가능
- Flux.create()의 두 번째 인자로 처리 방식을 전달하면 된다.
요청이 왔을 때 객체를 생성함. Lazy로딩보다 더 Lazy하다고 생각하자.
빈 Flux 객체를 empty를 통해 생성한다.
Mono
Mono 또한, Reactive Streams의 Publisher 구현체이다.
Mono는 0~1개의 데이터를 처리한다.(건 by 건)
1이 담긴 Mono를 생성한다. doOnEach메서드를 사용해 발생한 Signal<Integer>을 리스트에 담아준다.
doOnEach는 데이터를 방출 할 때, 혹은 완료 에러가 발생했을 때의 고급 이벤트로써, 파라미터로 Signal 이 넘어온다. 이 신호에는 context 도 포함되어 있다. 주로 모니터링으로 사용한다고 한다
즉, 위 코드에서 onNext 한번, onComplete한번이 list에 담겨 size가 2이다.