본문 바로가기
토비의봄

4.3. Reactive Streams

by 이석준석이 2021. 3. 3.

1. Single Thread

 

아래의 코드는 하나의 쓰레드(main)에서 publisher 과 subscriber 가 모두 수행된다.

 

옵저버 패턴을 쓸 때의 장점은

  • 이벤트 발생은 백그라운드에서
  • 실행하는 부분은 이벤트 발생시 (non-blocking) 수행 

하지만 아래의 코드에서는 해당 장점을 찾을 수 없다.

@Slf4j
public class SchedulerEx {
    public static void main(String[] args) {
        Publisher<Integer> pub = sub -> {
            sub.onSubscribe(new Subscription() {
                @Override
                public void request(long n) {
                    sub.onNext(1);
                    sub.onNext(2);
                    sub.onNext(3);
                    sub.onNext(4);
                    sub.onNext(5);
                    sub.onComplete();
                }

                @Override
                public void cancel() {

                }
            });
        };

        pub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                log.debug("onNext: {}", integer);
            }

            @Override
            public void onError(Throwable t) {
                log.debug("onError: {}", t);
            }

            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        });
    }
}

2. 비동기 Operator 생성 (SubscribeOn)

 

subscribeOn Operator

  • subscribe, onSubscribe, request 모두 특정한 스케줄러에 의해서 실행되게 하는 오퍼레이터이다.

먼저 간단한 Operator를 생성한다.

  • 역할은 위에서 생성한 Publisher과 Subscriber를 연결하는 간단한 역할을 한다.
@Slf4j
public class SchedulerEx {
    public static void main(String[] args) {
		...
        // Operator
        Publisher<Integer> subOnPub = sub -> {
            // 위에서 만든 publisher 가 subscribe 하도록, 현재는 아래의 로그찍는 subscriber
            pub.subscribe(sub);
        };

        subOnPub.subscribe(new Subscriber<Integer>() {
		...
    }
}

 

쓰레드를 하나 만들어 그 내부에서 pub, sub 이 동작하도록 구현한다.

  • main 쓰레드를 block 하지 않는다.
@Slf4j
public class SchedulerEx {
    public static void main(String[] args) {
		...
        // Operator
        Publisher<Integer> subOnPub = sub -> {
            // 한번에 한개의 쓰레드가 동작하게 해주는 Thread Pool을 보장
            ExecutorService es = Executors.newSingleThreadExecutor();
            // 만든 쓰레드에서 실행시킨다.
            es.execute(
                    // 위에서 만든 publisher 가 subscribe 하도록, 현재는 아래의 로그찍는 subscriber
                    () -> pub.subscribe(sub)
            );
        };
		...
    }
}

3. 비동기 Operator 생성 (PublishOn)

 

publishOn Operator

  • onNext, onComplete, onError는 publishOn에 정의한 쓰레드(스케줄러) 를 통해 동작하도록 하는 오퍼레이터이다.
  • subscriber 가 consume 이 상대적으로 느린 경우 사용한다.

 

Delegating publisher를 만든다.

@Slf4j
public class SchedulerEx {
    public static void main(String[] args) {
		...
        Publisher<Integer> pubOnPub = sub -> {
            pub.subscribe(new Subscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    sub.onSubscribe(s);
                }

                @Override
                public void onNext(Integer integer) {
                    sub.onNext(integer);
                }

                @Override
                public void onError(Throwable t) {
                    sub.onError(t);
                }

                @Override
                public void onComplete() {
                    sub.onComplete();
                }
            });
        };

        pubOnPub.subscribe(new Subscriber<Integer>() {
		...
    }
}

onNext, onComplete, onError 에 대해서는 새로운 쓰레드에서 수행될 수 있도록 쓰레드를 추가한다.

@Slf4j
public class SchedulerEx {
    public static void main(String[] args) {
        ...

        Publisher<Integer> pubOnPub = sub -> {
            subOnPub.subscribe(new Subscriber<Integer>() {
                ExecutorService es = Executors.newSingleThreadExecutor();
                @Override
                public void onSubscribe(Subscription s) {
                    sub.onSubscribe(s);
                }

                @Override
                public void onNext(Integer integer) {
                    es.execute(() -> sub.onNext(integer));
                }

                @Override
                public void onError(Throwable t) {
                    es.execute(() -> sub.onError(t));
                }

                @Override
                public void onComplete() {
                    es.execute(() -> sub.onComplete());
                }
            });
        };

       	...
    }
}

4. Flux subscribeOn

 

Reactor 의 Flux를 통해 subscribeOn 을 만들고 수행해보자

public class FluxScEx {
    public static void main(String[] args) {
        Flux.range(1, 10)
                .log()
                .subscribeOn(Schedulers.newSingle("sub"))
                .subscribe();
    }
}

---

09:28:37.916 [sub-1] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | request(unbounded)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(1)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(2)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(3)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(4)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(5)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(6)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(7)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(8)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(9)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(10)
09:28:37.922 [sub-1] INFO reactor.Flux.Range.1 - | onComplete()

5. Flux publishOn

 

Reactor의 Flux를 통해 publishOn 을 만들고 수행해보자

public class FluxScEx {
    public static void main(String[] args) {
        Flux.range(1, 10)
                .log()
                .publishOn(Schedulers.newSingle("pub"))
                .subscribe();
    }
}
 

6. Flux.take()

 

take() Operator 은 몇개까지의 데이터를 받으면 종료하도록 하는 오퍼레이터다.

@Slf4j
public class FluxScEx {
    public static void main(String[] args) {
        // daemon 쓰레드로 생성
        // daemon 쓰레드만 있는 경우에는 자바가 메인쓰레드가 종료되면 바로 종료되기 때문에 아무일도 생기지 않는다.
        Flux.interval(Duration.ofMillis(500))
                .take(10) // 10개의 데이터를 받으면 종료
                .subscribe(s -> log.debug("onNext:{}", s));

    }
}

 

이를 일반적인 Publisher / Subscriber를 사용하여 구현하면 아래와 같이 구현할 수 있다.

  • Subscription.cancel() 을 이용하여 중단시키는 방식으로 구현
@Slf4j
public class IntervalEx {
    public static void main(String[] args) {
        Publisher<Integer> pub = sub -> {
            sub.onSubscribe(new Subscription() {
                int no = 0;
                boolean cancelled = false;

                @Override
                public void request(long n) {
                    ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
                    exec.scheduleAtFixedRate(() -> {
                        if (cancelled) {
                            exec.shutdown();
                            return;
                        }
                        sub.onNext(no++);
                    }, 0, 300, TimeUnit.MILLISECONDS);
                }

                @Override
                public void cancel() {
                    cancelled = true;
                }
            });
        };

        Publisher<Integer> takePub = sub -> {
            pub.subscribe(new Subscriber<Integer>() {

                int count = 0;
                Subscription subsc;

                @Override
                public void onSubscribe(Subscription s) {
                    subsc = s;
                    sub.onSubscribe(s);
                }

                @Override
                public void onNext(Integer integer) {
                    sub.onNext(integer);
                    if(++count >= 5) {
                        subsc.cancel();
                    }
                }

                @Override
                public void onError(Throwable t) {
                    sub.onError(t);
                }

                @Override
                public void onComplete() {
                    sub.onComplete();
                }
            });
        };

        takePub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                log.debug("onNext: {}", integer);
            }

            @Override
            public void onError(Throwable t) {
                log.debug("onError: {}", t);
            }

            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        });
    }
}

'토비의봄' 카테고리의 다른 글

6. 비동기 RestTemplate, 비동기 MVC/Servlet  (0) 2021.03.06
5. 자바와 스프링의 비동기 개발기술  (0) 2021.03.05
4.2. Reactive Streams  (0) 2021.02.28
4.1. Reactive Streams  (0) 2021.02.28
3.2. Generics  (0) 2021.02.25