빗썸 테크 아카데미/TIL

9일차

민철킹 2021. 8. 31. 19:51

Reactive Programming

Flux

Flux는 Reactive Streams에서 정의한 Pulisher 구현체이다.

 

0~N개의 테이터를 발행할 수 있고 하나의 데이터를 전달할 때마다 onNext 이벤트를 발생시킨다.

 

모든 데이터가 전달 완료되면 onComplete 이벤트가 발생한다.

 

중간에 에러가 발생하면 onError 이벤트가 발생한다.

Flux

 

non-blocking 상태의 Flux는 for문을 통해 하나씩 꺼내어 사용하지 않고 만들 수 있다.

 

just 메서드를 사용해 Flux객체를 만들 수 있다. subscribe 메서드를 사용해 stream형식으로 데이터가 흐르도록한다.


range 메서드를 통해 해당 범위의 숫자를 담는 Flux 객체를 만들 수 있다.


fromArray를 사용해 위와 같은 새로운 배열을 Flux객체로 만들 수도 있다.

 

즉, 이런 식의 코드도 가능하다.

핵심은 blocking 방식의 객체를 non-blocking 객체로 바꿀 수 있다라는 점이다.


fromIterable을 사용해 Iterable을 구현한 List와 같은 객체를 Flux객체로 만들 수 있다.


또한 자바8 이후로 굉장히 많이 사용되는 stream을 Flux객체로 바꿀 수도 있는데 위 상태에서는 현재 List이기 때문에 컴파일 오류가 발생한다. 따라서 .stream()을 추가해 stream으로 바꿔주면된다.


구구단 3단을 만드는 예시이다.

어떤 객체에 이벤트가 발생했을 때 어떤 처리를 하도록 만들 수 있음.


Flux.create()와 배압

  • Subscriber로부터 요청이 왔을 때(FluxSink#onRequest) 데이터를 전송하거나(pull 방식)
  • Subscriber의 요청에 상관없이 데이터를 전송하거나(push 방식)
  • 두 방식 모두 Subscriber가 요청한 개수보다 더 많은 데이터를 발생할 수 있다.
  • 이 코드는 Subscriber가 요청한 개수보다 3개 데이터를 더 발생한다. 이 경우 어떻게 될까?
  • 기본적으로 Flux.create()로 생성한 Flux는 초과로 발생한 데이터를 버퍼에 보관한다.
  • 버퍼에 보관된 데이터는 다음에 Subscriber가 데이터를 요청할 때 전달된다.
  • 요청보다 발생한 데이터가 많을 때 선택할 수 있는 처리 방식은 다음과 같다.
  • IGNORE : Subscriber의 요청 무시하고 발생(Subscriber의 큐가 다 차면 IllegalStateException 발생)
  • ERROR : 익셉션(IllegalStateException) 발생
  • DROP : Subscriber가 데이터를 받을 준비가 안 되어 있으면 데이터 발생 누락
  • LATEST : 마지막 신호만 Subscriber에 전달
  • BUFFER : 버퍼에 저장했다가 Subscriber 요청시 전달. 버퍼 제한이 없으므로 OutOfMemoryError 발생 가능
  • Flux.create()의 두 번째 인자로 처리 방식을 전달하면 된다.

요청이 왔을 때 객체를 생성함. Lazy로딩보다 더 Lazy하다고 생각하자.

 


빈 Flux 객체를 empty를 통해 생성한다.

 


Mono

Mono 또한, Reactive Streams의 Publisher 구현체이다.

 

Mono는 0~1개의 데이터를 처리한다.(건 by 건)

Mono

1이 담긴 Mono를 생성한다. doOnEach메서드를 사용해 발생한 Signal<Integer>을 리스트에 담아준다.

doOnEach는 데이터를 방출 할 때, 혹은 완료 에러가 발생했을 때의 고급 이벤트로써, 파라미터로 Signal 이 넘어온다. 이 신호에는 context 도 포함되어 있다. 주로 모니터링으로 사용한다고 한다

 

즉, 위 코드에서 onNext 한번, onComplete한번이 list에 담겨 size가 2이다.



 

반응형

'빗썸 테크 아카데미 > TIL' 카테고리의 다른 글

8일차  (0) 2021.08.30
7일차  (0) 2021.08.29
6일차  (0) 2021.08.27
5일차  (0) 2021.08.25
4일차  (0) 2021.08.24