본문 바로가기
토비의봄

4.2. Reactive Streams

by 이석준석이 2021. 2. 28.

1. Operator

 

Reactive Streams 의 현재까지의 구조는 아래와 같다.

  • Publisher 가 데이터를 만들어 Subscriber 에게 전달
  • Publisher -> Data -> Subscriber

 

Operator들을 사용해서 데이터를 가공하는 플로우를 만든뒤에 Publisher.subscribe(Subscriber) 하여 플로우를 정의한다.

  • Publisher -> [Data1] -> Operator1 -> [Data2] -> Operator2 -> [Data3] -> Subscriber

Delegate 하는 클래스를 기반으로 재호출하는 방식으로 Operator 을 만들 수 있다.

private static <T, R> Publisher<R> reducePub(Publisher<T> publisher, R init, BiFunction<R, T, R> bf) {
    return new Publisher<R>() {
        @Override
        public void subscribe(Subscriber<? super R> sub) {
            publisher.subscribe(new DelegateSub<T, R>(sub) {
                R result = init;

                @Override
                public void onNext(T integer) {
                    result = bf.apply(result, integer);
                }

                @Override
                public void onComplete() {
                    sub.onNext(result);
                    sub.onComplete();
                }
            });
        }
    };
}

---

public class DelegateSub<T, R> implements Subscriber<T> {
    Subscriber sub;

    public DelegateSub(Subscriber<? super R> sub) {
        this.sub = sub;
    }

    @Override
    public void onSubscribe(Subscription s) {
        sub.onSubscribe(s);
    }

    @Override
    public void onNext(T param) {
        sub.onNext(param);
    }

    @Override
    public void onError(Throwable t) {
        sub.onError(t);
    }

    @Override
    public void onComplete() {
        sub.onComplete();
    }
}

2. Reactor

 

Flux

  • Publisher 인터페이스를 구현한 Flux 를 Reactor 가 제공한다.
    • .log()
    • .reduce()
    • .map() 등 유튜브에서 만든 함수들을 간단하게 사용할 수 있다.

public static void main(String[] args) {
    Flux.<Integer>create(e -> {
        e.next(1);
        e.next(2);
        e.next(3);
        e.complete();
    })
    .log()
    .map(s -> s * 10)
    .log()
    .subscribe(s -> System.out.println(s)); // onNext임
}
---
19:13:41.617 [main] INFO reactor.Flux.Create.1 - onSubscribe(FluxCreate.BufferAsyncSink)
19:13:41.618 [main] INFO reactor.Flux.Map.2 - onSubscribe(FluxMap.MapSubscriber)
19:13:41.619 [main] INFO reactor.Flux.Map.2 - request(unbounded)
19:13:41.619 [main] INFO reactor.Flux.Create.1 - request(unbounded)
19:13:41.621 [main] INFO reactor.Flux.Create.1 - onNext(1)
19:13:41.621 [main] INFO reactor.Flux.Map.2 - onNext(10)
10
19:13:41.622 [main] INFO reactor.Flux.Create.1 - onNext(2)
19:13:41.622 [main] INFO reactor.Flux.Map.2 - onNext(20)
20
19:13:41.622 [main] INFO reactor.Flux.Create.1 - onNext(3)
19:13:41.622 [main] INFO reactor.Flux.Map.2 - onNext(30)
30
19:13:41.622 [main] INFO reactor.Flux.Create.1 - onComplete()
19:13:41.622 [main] INFO reactor.Flux.Map.2 - onComplete()

 

'토비의봄' 카테고리의 다른 글

5. 자바와 스프링의 비동기 개발기술  (0) 2021.03.05
4.3. Reactive Streams  (0) 2021.03.03
4.1. Reactive Streams  (0) 2021.02.28
3.2. Generics  (0) 2021.02.25
3.1. Generics  (0) 2021.02.24