1. CompletableFuture
Future (java)
- 비동기적인 결과를 담고있는 오브젝트
ListenableFuture (spring)
- Future + Callbacks
CompletableFuture
- 간단하게 비동기적인 결과를 가져올 수 있음
Block 방식
- CompletableFuture.get() 을 이용해서 결과값을 가져오는 방식
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> f = new CompletableFuture<>();
f.complete(2);
f.completeExceptionally(new RuntimeException());
System.out.println(f.get());
}
비동기 방식
- 의존적인 비동기작업을 간단하게 구현할 수 있다.
- 두개 이상의 비동기작업을 실행하여 끝나면 결과를 사용하도록도 구현할 수 있다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture
.runAsync(() -> log.info("runAsync"))
.thenRun(() -> log.info("thenRun1"))
.thenRun(() -> log.info("thenRun2"));
log.info("exit");
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
----
09:27:33.593 [main] INFO tv.toby.completablefuture.CFuture - exit
09:27:33.593 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - runAsync
09:27:33.596 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - thenRun1
09:27:33.596 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - thenRun2
- supplyAsync로 시작해서 지난 장에서 구현했던 것 처럼 사용할 수 있다.
- supplyAsync(Supplier<T>)
- thenApply(Function<T, R>)
- thenAccept(Consumer<T>)
- thenCompose(이미 CompletableFuture를 리턴하는 경우)
- exceptionally(Function<T, R>)
- 위의 작업중에서 에러가 발생하면 해당 값으로 넘겨서 진행한다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture
.supplyAsync(() -> {
log.info("supplyAsync");
return 1;
})
.thenApply(s -> {
log.info("thenApply {}", s);
return s + 1;
})
.thenCompose(s -> {
log.info("thenCompose {}", s);
return CompletableFuture.completedFuture(s * 3); // CompletableFuture<> 로 값을 감싼다.
})
.thenAccept(s -> log.info("thenAccept {}", s));
log.info("exit");
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
-----
09:37:01.755 [main] INFO tv.toby.completablefuture.CFuture - exit
09:37:01.755 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - supplyAsync
09:37:01.757 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - thenApply 1
09:37:01.759 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - thenCompose 2
09:37:01.759 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - thenAccept 6
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture
...
.thenCompose(s -> {
log.info("thenCompose {}", s);
if(1 == 1) throw new RuntimeException();
return CompletableFuture.completedFuture(s * 3); // CompletableFuture<> 로 값을 감싼다.
})
.exceptionally(e -> 10) // 위의 작업중 에러가 발생하면 10으로 넘기고 싶다.
.thenAccept(s -> log.info("thenAccept {}", s));
log.info("exit");
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
----
09:40:05.673 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - supplyAsync
09:40:05.673 [main] INFO tv.toby.completablefuture.CFuture - exit
09:40:05.675 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - thenApply 1
09:40:05.678 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - thenCompose 2
09:40:05.678 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - thenAccept 10
- 위의 작업들에 Async를 붙이면 다른 별개의 백그라운드 쓰레드에서 실행할 수 있도록 할 수 있다.
- ex) thenApplyAsync(Function<T, R>, Executor)
- Async를 붙이지 않는다면, Call 한 쓰레드를 계속 물고있음.
- 위의 코드라면 netty의 쓰레드를 계속 물고 있을 것이다.
2. callback hell 개선
변경 후 장점
- 모든 작업마다 예외가 발생했을 때, 어떻게 처리할 것인가에 대한 콜백이 필요 없음
- 간단히 직렬화된 코드
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello " + idx);
f1.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
f2.addCallback(s2 -> {
ListenableFuture<String> f3 = myService.work(s2.getBody());
f3.addCallback(s3 -> {
dr.setResult(s3);
}, e3 -> {
dr.setErrorResult(e3.getMessage());
});
}, e2 -> {
dr.setErrorResult(e2.getMessage());
});
},
e -> {
dr.setErrorResult(e.getMessage());
});
return dr;
}
@RestController
public static class MyController {
@Autowired
MyService myService;
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
toCf(rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello " + idx))
.thenCompose(s -> toCf(rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody())))
.thenCompose(s -> toCf(myService.work(s.getBody())))
.thenAccept(s -> dr.setResult(s))
.exceptionally(e -> {
dr.setErrorResult(e.getMessage());
return null;
});
return dr;
}
public <T> CompletableFuture<T> toCf(ListenableFuture<T> lf) {
CompletableFuture<T> cf = new CompletableFuture<>();
// CompletableFuture 에 결과가 써지도록
lf.addCallback(s -> cf.complete(s), e -> cf.completeExceptionally(e));
return cf;
}
}
'토비의봄' 카테고리의 다른 글
10. Mono & Flux (0) | 2021.03.15 |
---|---|
9. Webflux (0) | 2021.03.11 |
7. AsyncTemplate 의 콜백헬, 중복작업 문제 (0) | 2021.03.08 |
6. 비동기 RestTemplate, 비동기 MVC/Servlet (0) | 2021.03.06 |
5. 자바와 스프링의 비동기 개발기술 (0) | 2021.03.05 |