1. Iterable
Iterable 구조
Iterable을 구현하면 for-each loop에 타겟이 될 수 있다.
public static void main(String[] args) {
Iterable<Integer> iter = Arrays.asList(1, 2, 3, 4, 5);
// for-each loop
for(Integer i : iter) {
System.out.println(i);
}
}
Iterable Interface
- 2개의 default 메소드, 1개의 메소드를 갖고있다.
- Iterator : Iterable Interface의 원소 순회를 하나씩 하기위해서 사용하는 도구
public interface Iterable<T> {
Iterator<T> iterator();
default void forEach(Consumer<? super T> action) {
Objects.requireNonNull(action);
for (T t : this) {
action.accept(t);
}
}
default Spliterator<T> spliterator() {
return Spliterators.spliteratorUnknownSize(iterator(), 0);
}
}
2. Iterable 구현체 만들기
위에서 본 것처럼,
- Iterable의 구현체에 대해서는 한 개의 메소드만 구현하면 되며
- 그 메소드에 리턴타입이 Iterator 이므로 Iterator를 구현하여 반환하면 된다.
1부터 10까지 for-each 구문을 사용할 수 있는 Iterable의 구현체 코드
public static void main(String[] args) {
Iterable<Integer> iter = () -> new Iterator<Integer>() {
int i = 0;
final static int MAX = 10;
public boolean hasNext() {
return i < MAX;
}
public Integer next() {
return ++i;
}
};
for(Integer i : iter) { // for-each loop
System.out.println(i);
}
}
3. Observable
Observable
- Event/Data 를 Observer한테 던지는 주체
Observer
- Observable 에 의해 등록되며, Event/Data 를 받아서 handling 함
Observable과 Observer 를 만들어내보자.
- 위의 Iterable 과 기능은 같으나 뭔가 다르다.
static class IntObservable extends Observable implements Runnable {
@Override
public void run() {
for(int i = 1 ; i <= 10 ; i++) {
setChanged(); // 변화가 있음을 알림
notifyObservers(i); // 옵저버에게 알려줌
}
}
}
public static void main(String[] args) {
Observer ob = new Observer() {
@Override
public void update(Observable o, Object arg) {
System.out.println(arg);
}
};
IntObservable io = new IntObservable();
io.addObserver(ob); // 이제부터는 Observable 이 던지는 이벤트는 Observer 가 받게 됨
io.run();
}
별개의 쓰레드에서 동작하게 해보자.
- 매우 쉽게 변경할 수 있다.
static class IntObservable extends Observable implements Runnable {
@Override
public void run() {
for(int i = 1 ; i <= 10 ; i++) {
setChanged(); // 변화가 있음을 알림
notifyObservers(i); // 옵저버에게 알려줌
}
}
}
public static void main(String[] args) {
Observer ob = new Observer() {
@Override
public void update(Observable o, Object arg) {
System.out.println(Thread.currentThread().getName() + " " + arg);
}
};
IntObservable io = new IntObservable();
io.addObserver(ob); // 이제부터는 Observable 이 던지는 이벤트는 Observer 가 받게 됨
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(io);
System.out.println(Thread.currentThread().getName() + " EXIT");
es.shutdown();
}
4. Duality
Iterable 과 Observable의 duality
- Iterable : Pull (다음 해야할 이벤트를 가져와서 실행) / int i = it.next();
- Observable : Push (다음 해야할 이벤트를 밀어넣어서 실행하도록 함) / notifyObservers(i);
5. Reactive
옵저버 패턴의 한계
- Complete 의 개념이 없다.
- Error 처리가 어렵다.
- 비동기적으로 구현했을 때 받은 예외를 처리하거나, 재시도로직을 하는 처리가 어렵다.
Reactive Stream 는 이를 어떻게처리했을까
6. Publisher / Subscriber
Publisher : Observable과 유사하다.
- Subscriber 에게 연속된 요소들을 제공
Subscriber : Observer와 유사하다.
Publisher.subscribe(Subscriber) 를 호출할 때, 아래의 프로토콜을 따라야 한다.
- onSubscribe (subscribe 가 시작됐다.) : 반드시 호출돼야함
- onNext* : 0~여러번 호출 가능
- (onError | onComplete) : 둘 중 하나 배타적으로 호출 가능
Publisher
- Data Stream 을 계속 지속적으로 생산한다.
Subscriber
- 최종적으로 Publisher 가 보낸 데이터를 사용한다.
- Publisher 로 부터 데이터를 받겠다는 과정을 거친다. (subscribe)
- Publisher.subscribe(Subscriber)
Subscrition
- subscriber 는 Subscrition을 통해 몇개의 데이터를 처리할 지 알린다. (back pressure)
- 이에대해 publisher은 그만큼의 데이터를 전송하는 Event를 발생시켜 Subscriber 가 처리하도록 한다.
public class PubSub {
public static void main(String[] args) throws InterruptedException {
Iterable<Integer> iterable = Arrays.asList(1, 2, 3, 4, 5);
ExecutorService es = Executors.newSingleThreadExecutor();
// Iterable 을 던지는 Publisher 를 만들어보자
Publisher publisher = new Publisher() {
Iterator<Integer> it = iterable.iterator();
@Override
public void subscribe(Subscriber subscriber) {
// onSubscribe 는 무조건 호출돼야함
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
es.execute(() -> {
int i = 0;
try {
while (i++ < n) {
if (it.hasNext()) {
subscriber.onNext(it.next());
} else {
// 성공 onComplete
subscriber.onComplete();
break;
}
}
} catch (RuntimeException e) {
// 에러처리 onError
subscriber.onError(e);
}
});
}
@Override
public void cancel() {
}
});
}
};
Subscriber subscriber = new Subscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("onSubscribe");
this.subscription = subscription;
this.subscription.request(1);
}
// 데이터를 하나씩 끌어오는 것
@Override
public void onNext(Integer item) {
System.out.println("onNext " + item);
this.subscription.request(1);
}
// Exception 에 해당하는 오브젝트를 만들어서 onError 로 넘겨줘
// try - catch 가 필요없음
@Override
public void onError(Throwable t) {
System.out.println("onError");
}
// 완료가 났을 때의 표준
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
publisher.subscribe(subscriber);
es.awaitTermination(10, TimeUnit.HOURS);
es.shutdown();
}
}
'토비의봄' 카테고리의 다른 글
4.3. Reactive Streams (0) | 2021.03.03 |
---|---|
4.2. Reactive Streams (0) | 2021.02.28 |
3.2. Generics (0) | 2021.02.25 |
3.1. Generics (0) | 2021.02.24 |
2. 슈퍼 타입 토큰 (0) | 2021.02.20 |