티스토리 뷰
Tech.io의 'Reactive Programming with Reactor 3'을 공부하면서 정리하는 글입니다.
위 다이어그램 중 볼륨 컨트롤은 리액티브 스트림의 Backpressure를 의미합니다.
Backpressure는 Subscriber
가 Publisher
에게 얼마만큼의 데이터를 처리할지 신호를 보내며, 이와 같은 방법으로 Publisher
의 데이터 생산을 제한합니다.
해당 컨트롤은 Subscription
레벨에서 처리됩니다.
구독(=subscribe()
)는 Subscription
객체를 반환합니다.
그리고 해당 객체의 request(long)
또는 cancel()
을 이용해서 데이터의 흐름을 제어하게 됩니다.
특히, request(Long.MAX_VALUE)
은, 무제한 데이터 요청을 의미하며, Publisher
는 최대한 빠르게 데이터를 전달하게 됩니다.
문제풀이
무제한 데이터 요청하는 경우
//========================================================================================
// TODO Create a StepVerifier that initially requests all values and expect 4 values to be received
StepVerifier requestAllExpectFour(Flux<User> flux) {
return StepVerifier.create(flux)
// .expectSubscription() // 이 코드를 추가하면 뭐가 달라질까?
.thenRequest(Long.MAX_VALUE)
.expectNextCount(4)
.expectComplete();
}
하나씩 데이터를 요청하는 경우
//========================================================================================
// TODO Create a StepVerifier that initially requests 1 value and expects User.SKYLER then requests another value and expects User.JESSE then stops verifying by cancelling the source
StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
return StepVerifier.create(flux)
.thenRequest(1)
.expectNext(User.SKYLER)
.thenRequest(1)
.expectNext(User.JESSE)
.thenCancel();
}
log()
함수를 이용하면 어떤 이벤트가 발생하는지 확인할 수 있습니다.
//========================================================================================
// TODO Return a Flux with all users stored in the repository that prints automatically logs for all Reactive Streams signals
ReactiveRepository<User> repository = new ReactiveUserRepository();
Flux<User> fluxWithLog() {
return repository
.findAll()
.log();
}
실행 결과
Compile and run your code
2021-03-15 21:55:23 [main] INFO reactor.Flux.Zip.1 - onSubscribe(FluxZip.ZipCoordinator)
2021-03-15 21:55:23 [main] INFO reactor.Flux.Zip.1 - request(1)
2021-03-15 21:55:23 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='swhite', firstname='Skyler', lastname='White'})
2021-03-15 21:55:23 [parallel-1] INFO reactor.Flux.Zip.1 - request(1)
2021-03-15 21:55:23 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='jpinkman', firstname='Jesse', lastname='Pinkman'})
2021-03-15 21:55:23 [parallel-1] INFO reactor.Flux.Zip.1 - request(2)
2021-03-15 21:55:23 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='wwhite', firstname='Walter', lastname='White'})
2021-03-15 21:55:23 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='sgoodman', firstname='Saul', lastname='Goodman'})
2021-03-15 21:55:23 [parallel-1] INFO reactor.Flux.Zip.1 - onComplete()
doOn
으로 시작하는 부수효과(Side effect) 함수를 이용하면, 데이터를 수정하지 않고, 특정 코드를 실행할 수 있습니다.
주의!! 해당 함수에선 블럭킹 코드나 지연이 있는 작업을 수행하지 말아야 합니다.
//========================================================================================
// TODO Return a Flux with all users stored in the repository that prints "Starring:" on subscribe, "firstname lastname" for all values and "The end!" on complete
Flux<User> fluxWithDoOnPrintln() {
return repository
.findAll()
.doOnSubscribe(s -> System.out.println("Starring:"))
.doOnNext(u -> System.out.println(u.getFirstname() + " " + u.getLastname()))
.doOnComplete(() -> System.out.println("The end!"));
}
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- 빔
- Eclipse
- Node.js
- VI
- Widget
- Git
- 윈도우즈
- dojo
- gvim
- tipoftheday
- Java
- unit test
- maven
- intellij
- VIM
- Coffeescript
- 리액터
- console
- Configuration
- 스프링인액션
- 커피스크립트
- Commonjs
- React
- 자바스크립트
- REACTOR
- Windows
- 자바
- JavaScript
- 리액티브프로그래밍
- reactiveprogramming
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
31 |
글 보관함