Tech.io의 'Reactive Programming with Reactor 3'을 공부하면서 정리하는 글입니다.
위 다이어그램 중 볼륨 컨트롤은 리액티브 스트림의 Backpressure를 의미합니다.
Backpressure는 Subscriber
가 Publisher
에게 얼마만큼의 데이터를 처리할지 신호를 보내며, 이와 같은 방법으로 Publisher
의 데이터 생산을 제한합니다.
해당 컨트롤은 Subscription
레벨에서 처리됩니다.
구독(=subscribe()
)는 Subscription
객체를 반환합니다.
그리고 해당 객체의 request(long)
또는 cancel()
을 이용해서 데이터의 흐름을 제어하게 됩니다.
특히, request(Long.MAX_VALUE)
은, 무제한 데이터 요청을 의미하며, Publisher
는 최대한 빠르게 데이터를 전달하게 됩니다.
문제풀이
무제한 데이터 요청하는 경우
//========================================================================================
// TODO Create a StepVerifier that initially requests all values and expect 4 values to be received
StepVerifier requestAllExpectFour(Flux<User> flux) {
return StepVerifier.create(flux)
// .expectSubscription() // 이 코드를 추가하면 뭐가 달라질까?
.thenRequest(Long.MAX_VALUE)
.expectNextCount(4)
.expectComplete();
}
하나씩 데이터를 요청하는 경우
//========================================================================================
// TODO Create a StepVerifier that initially requests 1 value and expects User.SKYLER then requests another value and expects User.JESSE then stops verifying by cancelling the source
StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
return StepVerifier.create(flux)
.thenRequest(1)
.expectNext(User.SKYLER)
.thenRequest(1)
.expectNext(User.JESSE)
.thenCancel();
}
log()
함수를 이용하면 어떤 이벤트가 발생하는지 확인할 수 있습니다.
//========================================================================================
// TODO Return a Flux with all users stored in the repository that prints automatically logs for all Reactive Streams signals
ReactiveRepository<User> repository = new ReactiveUserRepository();
Flux<User> fluxWithLog() {
return repository
.findAll()
.log();
}
실행 결과
Compile and run your code
2021-03-15 21:55:23 [main] INFO reactor.Flux.Zip.1 - onSubscribe(FluxZip.ZipCoordinator)
2021-03-15 21:55:23 [main] INFO reactor.Flux.Zip.1 - request(1)
2021-03-15 21:55:23 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='swhite', firstname='Skyler', lastname='White'})
2021-03-15 21:55:23 [parallel-1] INFO reactor.Flux.Zip.1 - request(1)
2021-03-15 21:55:23 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='jpinkman', firstname='Jesse', lastname='Pinkman'})
2021-03-15 21:55:23 [parallel-1] INFO reactor.Flux.Zip.1 - request(2)
2021-03-15 21:55:23 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='wwhite', firstname='Walter', lastname='White'})
2021-03-15 21:55:23 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='sgoodman', firstname='Saul', lastname='Goodman'})
2021-03-15 21:55:23 [parallel-1] INFO reactor.Flux.Zip.1 - onComplete()
doOn
으로 시작하는 부수효과(Side effect) 함수를 이용하면, 데이터를 수정하지 않고, 특정 코드를 실행할 수 있습니다.
주의!! 해당 함수에선 블럭킹 코드나 지연이 있는 작업을 수행하지 말아야 합니다.
//========================================================================================
// TODO Return a Flux with all users stored in the repository that prints "Starring:" on subscribe, "firstname lastname" for all values and "The end!" on complete
Flux<User> fluxWithDoOnPrintln() {
return repository
.findAll()
.doOnSubscribe(s -> System.out.println("Starring:"))
.doOnNext(u -> System.out.println(u.getFirstname() + " " + u.getLastname()))
.doOnComplete(() -> System.out.println("The end!"));
}