티스토리 뷰

learning

요청(Request, 7/12)

눈침침 2021. 3. 16. 12:52

Tech.io의 'Reactive Programming with Reactor 3'을 공부하면서 정리하는 글입니다.

위 다이어그램 중 볼륨 컨트롤은 리액티브 스트림의 Backpressure를 의미합니다.
Backpressure는 SubscriberPublisher에게 얼마만큼의 데이터를 처리할지 신호를 보내며, 이와 같은 방법으로 Publisher의 데이터 생산을 제한합니다.

해당 컨트롤은 Subscription 레벨에서 처리됩니다.
구독(=subscribe())는 Subscription 객체를 반환합니다.

그리고 해당 객체의 request(long)또는 cancel()을 이용해서 데이터의 흐름을 제어하게 됩니다.

특히, request(Long.MAX_VALUE)은, 무제한 데이터 요청을 의미하며, Publisher는 최대한 빠르게 데이터를 전달하게 됩니다.

문제풀이

무제한 데이터 요청하는 경우

//========================================================================================

 // TODO Create a StepVerifier that initially requests all values and expect 4 values to be received
 StepVerifier requestAllExpectFour(Flux<User> flux) {
   return StepVerifier.create(flux)
    // .expectSubscription() // 이 코드를 추가하면 뭐가 달라질까?
    .thenRequest(Long.MAX_VALUE)
    .expectNextCount(4)
    .expectComplete();
 }

하나씩 데이터를 요청하는 경우

//========================================================================================

 // TODO Create a StepVerifier that initially requests 1 value and expects User.SKYLER then requests another value and expects User.JESSE then stops verifying by cancelling the source
 StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
   return StepVerifier.create(flux)
     .thenRequest(1)
     .expectNext(User.SKYLER)
     .thenRequest(1)
     .expectNext(User.JESSE)
     .thenCancel();
 }

log() 함수를 이용하면 어떤 이벤트가 발생하는지 확인할 수 있습니다.

//========================================================================================

 // TODO Return a Flux with all users stored in the repository that prints automatically logs for all Reactive Streams signals
 ReactiveRepository<User> repository = new ReactiveUserRepository();

 Flux<User> fluxWithLog() {
   return repository
     .findAll()
     .log();
 }

실행 결과

Compile and run your code
2021-03-15 21:55:23 [main] INFO  reactor.Flux.Zip.1 - onSubscribe(FluxZip.ZipCoordinator)
2021-03-15 21:55:23 [main] INFO  reactor.Flux.Zip.1 - request(1)
2021-03-15 21:55:23 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext(Person{username='swhite', firstname='Skyler', lastname='White'})
2021-03-15 21:55:23 [parallel-1] INFO  reactor.Flux.Zip.1 - request(1)
2021-03-15 21:55:23 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext(Person{username='jpinkman', firstname='Jesse', lastname='Pinkman'})
2021-03-15 21:55:23 [parallel-1] INFO  reactor.Flux.Zip.1 - request(2)
2021-03-15 21:55:23 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext(Person{username='wwhite', firstname='Walter', lastname='White'})
2021-03-15 21:55:23 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext(Person{username='sgoodman', firstname='Saul', lastname='Goodman'})
2021-03-15 21:55:23 [parallel-1] INFO  reactor.Flux.Zip.1 - onComplete()

doOn으로 시작하는 부수효과(Side effect) 함수를 이용하면, 데이터를 수정하지 않고, 특정 코드를 실행할 수 있습니다.

주의!! 해당 함수에선 블럭킹 코드나 지연이 있는 작업을 수행하지 말아야 합니다.


//========================================================================================

 // TODO Return a Flux with all users stored in the repository that prints "Starring:" on subscribe, "firstname lastname" for all values and "The end!" on complete
 Flux<User> fluxWithDoOnPrintln() {
   return repository
     .findAll()
     .doOnSubscribe(s -> System.out.println("Starring:"))
     .doOnNext(u -> System.out.println(u.getFirstname() + " " + u.getLastname()))
     .doOnComplete(() -> System.out.println("The end!"));
 }
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/03   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31
글 보관함