1. Single Thread
아래의 코드는 하나의 쓰레드(main)에서 publisher 과 subscriber 가 모두 수행된다.
옵저버 패턴을 쓸 때의 장점은
- 이벤트 발생은 백그라운드에서
- 실행하는 부분은 이벤트 발생시 (non-blocking) 수행
하지만 아래의 코드에서는 해당 장점을 찾을 수 없다.
@Slf4j
public class SchedulerEx {
public static void main(String[] args) {
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}
@Override
public void cancel() {
}
});
};
pub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
log.debug("onNext: {}", integer);
}
@Override
public void onError(Throwable t) {
log.debug("onError: {}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
}
}
2. 비동기 Operator 생성 (SubscribeOn)
subscribeOn Operator
- subscribe, onSubscribe, request 모두 특정한 스케줄러에 의해서 실행되게 하는 오퍼레이터이다.
먼저 간단한 Operator를 생성한다.
- 역할은 위에서 생성한 Publisher과 Subscriber를 연결하는 간단한 역할을 한다.
@Slf4j
public class SchedulerEx {
public static void main(String[] args) {
...
// Operator
Publisher<Integer> subOnPub = sub -> {
// 위에서 만든 publisher 가 subscribe 하도록, 현재는 아래의 로그찍는 subscriber
pub.subscribe(sub);
};
subOnPub.subscribe(new Subscriber<Integer>() {
...
}
}
쓰레드를 하나 만들어 그 내부에서 pub, sub 이 동작하도록 구현한다.
- main 쓰레드를 block 하지 않는다.
@Slf4j
public class SchedulerEx {
public static void main(String[] args) {
...
// Operator
Publisher<Integer> subOnPub = sub -> {
// 한번에 한개의 쓰레드가 동작하게 해주는 Thread Pool을 보장
ExecutorService es = Executors.newSingleThreadExecutor();
// 만든 쓰레드에서 실행시킨다.
es.execute(
// 위에서 만든 publisher 가 subscribe 하도록, 현재는 아래의 로그찍는 subscriber
() -> pub.subscribe(sub)
);
};
...
}
}
3. 비동기 Operator 생성 (PublishOn)
publishOn Operator
- onNext, onComplete, onError는 publishOn에 정의한 쓰레드(스케줄러) 를 통해 동작하도록 하는 오퍼레이터이다.
- subscriber 가 consume 이 상대적으로 느린 경우 사용한다.
Delegating publisher를 만든다.
@Slf4j
public class SchedulerEx {
public static void main(String[] args) {
...
Publisher<Integer> pubOnPub = sub -> {
pub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(Integer integer) {
sub.onNext(integer);
}
@Override
public void onError(Throwable t) {
sub.onError(t);
}
@Override
public void onComplete() {
sub.onComplete();
}
});
};
pubOnPub.subscribe(new Subscriber<Integer>() {
...
}
}
onNext, onComplete, onError 에 대해서는 새로운 쓰레드에서 수행될 수 있도록 쓰레드를 추가한다.
@Slf4j
public class SchedulerEx {
public static void main(String[] args) {
...
Publisher<Integer> pubOnPub = sub -> {
subOnPub.subscribe(new Subscriber<Integer>() {
ExecutorService es = Executors.newSingleThreadExecutor();
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(Integer integer) {
es.execute(() -> sub.onNext(integer));
}
@Override
public void onError(Throwable t) {
es.execute(() -> sub.onError(t));
}
@Override
public void onComplete() {
es.execute(() -> sub.onComplete());
}
});
};
...
}
}
4. Flux subscribeOn
Reactor 의 Flux를 통해 subscribeOn 을 만들고 수행해보자
public class FluxScEx {
public static void main(String[] args) {
Flux.range(1, 10)
.log()
.subscribeOn(Schedulers.newSingle("sub"))
.subscribe();
}
}
---
09:28:37.916 [sub-1] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | request(unbounded)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(1)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(2)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(3)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(4)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(5)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(6)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(7)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(8)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(9)
09:28:37.921 [sub-1] INFO reactor.Flux.Range.1 - | onNext(10)
09:28:37.922 [sub-1] INFO reactor.Flux.Range.1 - | onComplete()
5. Flux publishOn
Reactor의 Flux를 통해 publishOn 을 만들고 수행해보자
public class FluxScEx {
public static void main(String[] args) {
Flux.range(1, 10)
.log()
.publishOn(Schedulers.newSingle("pub"))
.subscribe();
}
}
6. Flux.take()
take() Operator 은 몇개까지의 데이터를 받으면 종료하도록 하는 오퍼레이터다.
@Slf4j
public class FluxScEx {
public static void main(String[] args) {
// daemon 쓰레드로 생성
// daemon 쓰레드만 있는 경우에는 자바가 메인쓰레드가 종료되면 바로 종료되기 때문에 아무일도 생기지 않는다.
Flux.interval(Duration.ofMillis(500))
.take(10) // 10개의 데이터를 받으면 종료
.subscribe(s -> log.debug("onNext:{}", s));
}
}
이를 일반적인 Publisher / Subscriber를 사용하여 구현하면 아래와 같이 구현할 수 있다.
- Subscription.cancel() 을 이용하여 중단시키는 방식으로 구현
@Slf4j
public class IntervalEx {
public static void main(String[] args) {
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
int no = 0;
boolean cancelled = false;
@Override
public void request(long n) {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
exec.scheduleAtFixedRate(() -> {
if (cancelled) {
exec.shutdown();
return;
}
sub.onNext(no++);
}, 0, 300, TimeUnit.MILLISECONDS);
}
@Override
public void cancel() {
cancelled = true;
}
});
};
Publisher<Integer> takePub = sub -> {
pub.subscribe(new Subscriber<Integer>() {
int count = 0;
Subscription subsc;
@Override
public void onSubscribe(Subscription s) {
subsc = s;
sub.onSubscribe(s);
}
@Override
public void onNext(Integer integer) {
sub.onNext(integer);
if(++count >= 5) {
subsc.cancel();
}
}
@Override
public void onError(Throwable t) {
sub.onError(t);
}
@Override
public void onComplete() {
sub.onComplete();
}
});
};
takePub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
log.debug("onNext: {}", integer);
}
@Override
public void onError(Throwable t) {
log.debug("onError: {}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
}
}
'토비의봄' 카테고리의 다른 글
6. 비동기 RestTemplate, 비동기 MVC/Servlet (0) | 2021.03.06 |
---|---|
5. 자바와 스프링의 비동기 개발기술 (0) | 2021.03.05 |
4.2. Reactive Streams (0) | 2021.02.28 |
4.1. Reactive Streams (0) | 2021.02.28 |
3.2. Generics (0) | 2021.02.25 |