본문 바로가기
토비의봄

4.1. Reactive Streams

by 이석준석이 2021. 2. 28.

1. Iterable

 

Iterable 구조

Iterable, Collection, List 의 구조

Iterable을 구현하면 for-each loop에 타겟이 될 수 있다.

public static void main(String[] args) {
    Iterable<Integer> iter = Arrays.asList(1, 2, 3, 4, 5);
    // for-each loop
    for(Integer i : iter) {
        System.out.println(i);
    }
}

Iterable Interface

  • 2개의 default 메소드, 1개의 메소드를 갖고있다.
  • Iterator : Iterable Interface의 원소 순회를 하나씩 하기위해서 사용하는 도구
public interface Iterable<T> {
    Iterator<T> iterator();

    default void forEach(Consumer<? super T> action) {
        Objects.requireNonNull(action);
        for (T t : this) {
            action.accept(t);
        }
    }

    default Spliterator<T> spliterator() {
        return Spliterators.spliteratorUnknownSize(iterator(), 0);
    }
}

2. Iterable 구현체 만들기

 

위에서 본 것처럼,

  • Iterable의 구현체에 대해서는 한 개의 메소드만 구현하면 되며 
  • 메소드에 리턴타입이 Iterator 이므로 Iterator를 구현하여 반환하면 된다.

 

1부터 10까지 for-each 구문을 사용할 수 있는 Iterable의 구현체 코드

public static void main(String[] args) {
    Iterable<Integer> iter = () -> new Iterator<Integer>() {
        int i = 0;
        final static int MAX = 10;

        public boolean hasNext() {
            return i < MAX;
        }

        public Integer next() {
            return ++i;
        }
    };

    for(Integer i : iter) { // for-each loop
        System.out.println(i);
    }
}

3. Observable

 

Observable

  • Event/Data 를 Observer한테 던지는 주체

Observer

  • Observable 에 의해 등록되며, Event/Data 를 받아서 handling 함

Observable과 Observer 를 만들어내보자.

  • 위의 Iterable 과 기능은 같으나 뭔가 다르다.
static class IntObservable extends Observable implements Runnable {
    @Override
    public void run() {
        for(int i = 1 ; i <= 10 ; i++) {
            setChanged(); // 변화가 있음을 알림
            notifyObservers(i); // 옵저버에게 알려줌
        }
    }
}

public static void main(String[] args) {
    Observer ob = new Observer() {
        @Override
        public void update(Observable o, Object arg) {
            System.out.println(arg);
        }
    };

    IntObservable io = new IntObservable();
    io.addObserver(ob); // 이제부터는 Observable 이 던지는 이벤트는 Observer 가 받게 됨

    io.run();
}

별개의 쓰레드에서 동작하게 해보자.

  • 매우 쉽게 변경할 수 있다.
static class IntObservable extends Observable implements Runnable {
    @Override
    public void run() {
        for(int i = 1 ; i <= 10 ; i++) {
            setChanged(); // 변화가 있음을 알림
            notifyObservers(i); // 옵저버에게 알려줌
        }
    }
}

public static void main(String[] args) {
    Observer ob = new Observer() {
        @Override
        public void update(Observable o, Object arg) {
            System.out.println(Thread.currentThread().getName() + " " + arg);
        }
    };

    IntObservable io = new IntObservable();
    io.addObserver(ob); // 이제부터는 Observable 이 던지는 이벤트는 Observer 가 받게 됨

    ExecutorService es = Executors.newSingleThreadExecutor();
    es.execute(io);
    System.out.println(Thread.currentThread().getName() + " EXIT");
    es.shutdown();
}

4. Duality

 

Iterable 과 Observable의 duality

  • Iterable : Pull (다음 해야할 이벤트를 가져와서 실행) / int i = it.next();
  • Observable : Push (다음 해야할 이벤트를 밀어넣어서 실행하도록 함) / notifyObservers(i); 

5. Reactive

 

옵저버 패턴의 한계

  1. Complete 의 개념이 없다. 
  2. Error 처리가 어렵다.
    • 비동기적으로 구현했을 때 받은 예외를 처리하거나, 재시도로직을 하는 처리가 어렵다.

 

Reactive Stream 는 이를 어떻게처리했을까


6. Publisher / Subscriber

 

Publisher : Observable과 유사하다.

  • Subscriber 에게 연속된 요소들을 제공

Subscriber : Observer와 유사하다.


Publisher.subscribe(Subscriber) 를 호출할 때, 아래의 프로토콜을 따라야 한다.

Publisher.subscribe(Subscriber) protocol

  • onSubscribe (subscribe 가 시작됐다.) : 반드시 호출돼야함
  • onNext* : 0~여러번 호출 가능
  • (onError | onComplete) : 둘 중 하나 배타적으로 호출 가능

Publisher

  • Data Stream 을 계속 지속적으로 생산한다.

Subscriber

  • 최종적으로 Publisher 가 보낸 데이터를 사용한다.
  • Publisher 로 부터 데이터를 받겠다는 과정을 거친다. (subscribe)
    • Publisher.subscribe(Subscriber)

Subscrition

  • subscriber 는 Subscrition을 통해 몇개의 데이터를 처리할 지 알린다. (back pressure)
  • 이에대해 publisher은 그만큼의 데이터를 전송하는 Event를 발생시켜 Subscriber 가 처리하도록 한다.

public class PubSub {
    public static void main(String[] args) throws InterruptedException {
        Iterable<Integer> iterable = Arrays.asList(1, 2, 3, 4, 5);
        ExecutorService es = Executors.newSingleThreadExecutor();

        // Iterable 을 던지는 Publisher 를 만들어보자
        Publisher publisher = new Publisher() {
            Iterator<Integer> it = iterable.iterator();

            @Override
            public void subscribe(Subscriber subscriber) {
                // onSubscribe 는 무조건 호출돼야함
                subscriber.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        es.execute(() -> {
                            int i = 0;
                            try {
                                while (i++ < n) {
                                    if (it.hasNext()) {
                                        subscriber.onNext(it.next());
                                    } else {
                                        // 성공 onComplete
                                        subscriber.onComplete();
                                        break;
                                    }
                                }
                            } catch (RuntimeException e) {
                                // 에러처리 onError
                                subscriber.onError(e);
                            }
                        });
                    }

                    @Override
                    public void cancel() {

                    }
                });
            }
        };

        Subscriber subscriber = new Subscriber<Integer>() {
            Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                System.out.println("onSubscribe");
                this.subscription = subscription;
                this.subscription.request(1);
            }

            // 데이터를 하나씩 끌어오는 것
            @Override
            public void onNext(Integer item) {
                System.out.println("onNext " + item);
                this.subscription.request(1);
            }

            // Exception 에 해당하는 오브젝트를 만들어서 onError 로 넘겨줘
            // try - catch 가 필요없음
            @Override
            public void onError(Throwable t) {
                System.out.println("onError");
            }

            // 완료가 났을 때의 표준
            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };

        publisher.subscribe(subscriber);
        es.awaitTermination(10, TimeUnit.HOURS);
        es.shutdown();
    }
}

'토비의봄' 카테고리의 다른 글

4.3. Reactive Streams  (0) 2021.03.03
4.2. Reactive Streams  (0) 2021.02.28
3.2. Generics  (0) 2021.02.25
3.1. Generics  (0) 2021.02.24
2. 슈퍼 타입 토큰  (0) 2021.02.20