본문 바로가기
토비의봄

8. CompletableFuture

by 이석준석이 2021. 3. 9.

1. CompletableFuture

 

Future (java)

  • 비동기적인 결과를 담고있는 오브젝트

ListenableFuture (spring)

  • Future + Callbacks

CompletableFuture

  • 간단하게 비동기적인 결과를 가져올 수 있음

Block 방식

  • CompletableFuture.get() 을 이용해서 결과값을 가져오는 방식
public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> f = new CompletableFuture<>();
    f.complete(2);
    f.completeExceptionally(new RuntimeException());
    System.out.println(f.get());
}

비동기 방식

  • 의존적인 비동기작업을 간단하게 구현할 수 있다.
  • 두개 이상의 비동기작업을 실행하여 끝나면 결과를 사용하도록도 구현할 수 있다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture
            .runAsync(() -> log.info("runAsync"))
            .thenRun(() -> log.info("thenRun1"))
            .thenRun(() -> log.info("thenRun2"));
    log.info("exit");

    ForkJoinPool.commonPool().shutdown();
    ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
----
09:27:33.593 [main] INFO tv.toby.completablefuture.CFuture - exit
09:27:33.593 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - runAsync
09:27:33.596 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - thenRun1
09:27:33.596 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - thenRun2

 

  • supplyAsync로 시작해서 지난 장에서 구현했던 것 처럼 사용할 수 있다.
    • supplyAsync(Supplier<T>)
    • thenApply(Function<T, R>)
    • thenAccept(Consumer<T>)
    • thenCompose(이미 CompletableFuture를 리턴하는 경우)
    • exceptionally(Function<T, R>)
      • 위의 작업중에서 에러가 발생하면 해당 값으로 넘겨서 진행한다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture
            .supplyAsync(() -> {
                log.info("supplyAsync");
                return 1;
            })
            .thenApply(s -> {
                log.info("thenApply {}", s);
                return s + 1;
            })
            .thenCompose(s -> {
                log.info("thenCompose {}", s);
                return CompletableFuture.completedFuture(s * 3); // CompletableFuture<> 로 값을 감싼다.
            })
            .thenAccept(s -> log.info("thenAccept {}", s));
    log.info("exit");

    ForkJoinPool.commonPool().shutdown();
    ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
-----
09:37:01.755 [main] INFO tv.toby.completablefuture.CFuture - exit
09:37:01.755 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - supplyAsync
09:37:01.757 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - thenApply 1
09:37:01.759 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - thenCompose 2
09:37:01.759 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - thenAccept 6
public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture
			...
            .thenCompose(s -> {
                log.info("thenCompose {}", s);
                if(1 == 1) throw new RuntimeException();
                return CompletableFuture.completedFuture(s * 3); // CompletableFuture<> 로 값을 감싼다.
            })
            .exceptionally(e -> 10) // 위의 작업중 에러가 발생하면 10으로 넘기고 싶다.
            .thenAccept(s -> log.info("thenAccept {}", s));
    log.info("exit");

    ForkJoinPool.commonPool().shutdown();
    ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
----
09:40:05.673 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - supplyAsync
09:40:05.673 [main] INFO tv.toby.completablefuture.CFuture - exit
09:40:05.675 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - thenApply 1
09:40:05.678 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - thenCompose 2
09:40:05.678 [ForkJoinPool.commonPool-worker-9] INFO tv.toby.completablefuture.CFuture - thenAccept 10

 

  • 위의 작업들에 Async를 붙이면 다른 별개의 백그라운드 쓰레드에서 실행할 수 있도록 할 수 있다.
    • ex) thenApplyAsync(Function<T, R>, Executor)
  • Async를 붙이지 않는다면, Call 한 쓰레드를 계속 물고있음.
    • 위의 코드라면 netty의 쓰레드를 계속 물고 있을 것이다.

2. callback hell 개선

 

변경 후 장점

  • 모든 작업마다 예외가 발생했을 때, 어떻게 처리할 것인가에 대한 콜백이 필요 없음
  • 간단히 직렬화된 코드
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
    DeferredResult<String> dr = new DeferredResult<>();

    ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello " + idx);
    f1.addCallback(s -> {
        ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
        f2.addCallback(s2 -> {
            ListenableFuture<String> f3 = myService.work(s2.getBody());
            f3.addCallback(s3 -> {
                dr.setResult(s3);
            }, e3 -> {
                dr.setErrorResult(e3.getMessage());
            });
        }, e2 -> {
            dr.setErrorResult(e2.getMessage());
        });
    },
    e -> {
        dr.setErrorResult(e.getMessage());
    });

    return dr;
}
@RestController
public static class MyController {
    @Autowired
    MyService myService;

    AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

    @GetMapping("/rest")
    public DeferredResult<String> rest(int idx) {
        DeferredResult<String> dr = new DeferredResult<>();

        toCf(rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello " + idx))
                .thenCompose(s -> toCf(rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody())))
                .thenCompose(s -> toCf(myService.work(s.getBody())))
                .thenAccept(s -> dr.setResult(s))
                .exceptionally(e -> {
                    dr.setErrorResult(e.getMessage());
                    return null;
                });

        return dr;
    }

    public <T> CompletableFuture<T> toCf(ListenableFuture<T> lf) {
        CompletableFuture<T> cf = new CompletableFuture<>();
        // CompletableFuture 에 결과가 써지도록
        lf.addCallback(s -> cf.complete(s), e -> cf.completeExceptionally(e));
        return cf;
    }
}

 

'토비의봄' 카테고리의 다른 글

10. Mono & Flux  (0) 2021.03.15
9. Webflux  (0) 2021.03.11
7. AsyncTemplate 의 콜백헬, 중복작업 문제  (0) 2021.03.08
6. 비동기 RestTemplate, 비동기 MVC/Servlet  (0) 2021.03.06
5. 자바와 스프링의 비동기 개발기술  (0) 2021.03.05