1. Operator
Reactive Streams 의 현재까지의 구조는 아래와 같다.
- Publisher 가 데이터를 만들어 Subscriber 에게 전달
- Publisher -> Data -> Subscriber
Operator들을 사용해서 데이터를 가공하는 플로우를 만든뒤에 Publisher.subscribe(Subscriber) 하여 플로우를 정의한다.
- Publisher -> [Data1] -> Operator1 -> [Data2] -> Operator2 -> [Data3] -> Subscriber
Delegate 하는 클래스를 기반으로 재호출하는 방식으로 Operator 을 만들 수 있다.
private static <T, R> Publisher<R> reducePub(Publisher<T> publisher, R init, BiFunction<R, T, R> bf) {
return new Publisher<R>() {
@Override
public void subscribe(Subscriber<? super R> sub) {
publisher.subscribe(new DelegateSub<T, R>(sub) {
R result = init;
@Override
public void onNext(T integer) {
result = bf.apply(result, integer);
}
@Override
public void onComplete() {
sub.onNext(result);
sub.onComplete();
}
});
}
};
}
---
public class DelegateSub<T, R> implements Subscriber<T> {
Subscriber sub;
public DelegateSub(Subscriber<? super R> sub) {
this.sub = sub;
}
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(T param) {
sub.onNext(param);
}
@Override
public void onError(Throwable t) {
sub.onError(t);
}
@Override
public void onComplete() {
sub.onComplete();
}
}
2. Reactor
Flux
- Publisher 인터페이스를 구현한 Flux 를 Reactor 가 제공한다.
- .log()
- .reduce()
- .map() 등 유튜브에서 만든 함수들을 간단하게 사용할 수 있다.
public static void main(String[] args) {
Flux.<Integer>create(e -> {
e.next(1);
e.next(2);
e.next(3);
e.complete();
})
.log()
.map(s -> s * 10)
.log()
.subscribe(s -> System.out.println(s)); // onNext임
}
---
19:13:41.617 [main] INFO reactor.Flux.Create.1 - onSubscribe(FluxCreate.BufferAsyncSink)
19:13:41.618 [main] INFO reactor.Flux.Map.2 - onSubscribe(FluxMap.MapSubscriber)
19:13:41.619 [main] INFO reactor.Flux.Map.2 - request(unbounded)
19:13:41.619 [main] INFO reactor.Flux.Create.1 - request(unbounded)
19:13:41.621 [main] INFO reactor.Flux.Create.1 - onNext(1)
19:13:41.621 [main] INFO reactor.Flux.Map.2 - onNext(10)
10
19:13:41.622 [main] INFO reactor.Flux.Create.1 - onNext(2)
19:13:41.622 [main] INFO reactor.Flux.Map.2 - onNext(20)
20
19:13:41.622 [main] INFO reactor.Flux.Create.1 - onNext(3)
19:13:41.622 [main] INFO reactor.Flux.Map.2 - onNext(30)
30
19:13:41.622 [main] INFO reactor.Flux.Create.1 - onComplete()
19:13:41.622 [main] INFO reactor.Flux.Map.2 - onComplete()
'토비의봄' 카테고리의 다른 글
5. 자바와 스프링의 비동기 개발기술 (0) | 2021.03.05 |
---|---|
4.3. Reactive Streams (0) | 2021.03.03 |
4.1. Reactive Streams (0) | 2021.02.28 |
3.2. Generics (0) | 2021.02.25 |
3.1. Generics (0) | 2021.02.24 |