본문 바로가기
토비의봄

5. 자바와 스프링의 비동기 개발기술

by 이석준석이 2021. 3. 5.

1. Java Future

 

Future

  • 비동기적인 연산에 대한 결과를 가지고있는 것
  • 다른쓰레드에서 사용한 결과를 가져오는 가장 기본이되는 인터페이스이다.

 

// 쓰레드풀

  • 쓰레드를 생성하고 없애는데에는 CPU 사용이 많다.
  • 이를 없애지않고, 반납한 뒤에 재활용하여 비용이 많이 들어가는 것을 최소화하기위해 사용

@Slf4j
public class FutureEx {
    public static void main(String[] args) throws Exception {
        ExecutorService es = Executors.newCachedThreadPool();

        // Runnable 인터페이스를 받음
        Future<String> f = es.submit(() -> {
            Thread.sleep(2000);
            log.info("Async");
            return "Hello";
        });
        log.info(f.get()); // Blocking Method

        log.info("Exit");
    }
}
----
09:15:47.327 [pool-1-thread-1] INFO tv.toby.future.FutureEx - Async
09:15:47.331 [main] INFO tv.toby.future.FutureEx - Hello
09:15:47.331 [main] INFO tv.toby.future.FutureEx - Exit

결과 출력을 보니

  • future의 get()은, 비동기작업이 완료되기전 까지 메인쓰레드가 진행하지 못한다. [Blocking]

순서를 바꿔서 출력한다면 다른작업을 동시에 수행할 수 있으며,

  • Future를 통해 다른쓰레드에서 수행한 결과값을 가져올 수 있다.
@Slf4j
public class FutureEx {
    public static void main(String[] args) throws Exception {
        ExecutorService es = Executors.newCachedThreadPool();

        // Runnable 인터페이스를 받음
        Future<String> f = es.submit(() -> {
            Thread.sleep(2000);
            log.info("Async");
            return "Hello";
        });

        log.info("Exit");

        log.info(f.get()); // Blocking
    }
}
---
09:20:35.696 [main] INFO tv.toby.future.FutureEx - Exit
09:20:37.699 [pool-1-thread-1] INFO tv.toby.future.FutureEx - Async
09:20:37.699 [main] INFO tv.toby.future.FutureEx - Hello

2. 콜백

 

FutureTask 

  • Callable / Runnable 인터페이스를 생성자를 통해 받아, es.execute 로 수행 가능
  • done() 메소드는 비동기작업이 끝나면 수행되는 hook
  • get() 메소드는 비동기작업의 결과를 갖고있음
@Slf4j
public class FutureEx {
    public static void main(String[] args) throws Exception {
        ExecutorService es = Executors.newCachedThreadPool();

        FutureTask<String> f = new FutureTask<String>(() -> {
            Thread.sleep(2000);
            log.info("Async");
            return "Hello";
        }) {
            @SneakyThrows
            @Override
            // 비동기작업이 완료되면 수행되는 일종의 hook
            protected void done() {
                log.info(get());
            }
        };

        es.execute(f);
        es.shutdown();
    }
}

위의 코드를 기반으로

  • FutureTask 를 상속받고,
  • FutureTask 이후에 수행되는 Interface를 이용하여
  • FutureTask 가 수행되면 done() 메소드를 이용하여 결과값이 출력되도록 하는 
  • 또한 예외에 대해 처리하는 Interface를 추가하여 예외또한 처리하는 콜백을 만들자
@Slf4j
public class FutureEx {
    interface SuccessCallback {
        void onSuccess(String result);
    }

    interface ExceptionCallback {
        void onError(Throwable t);
    }

    public static class CallbackFutureTask extends FutureTask<String> {
        SuccessCallback sc;
        ExceptionCallback ec;
        public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
            super(callable);
            this.sc = Objects.requireNonNull(sc);
            this.ec = Objects.requireNonNull(ec);
        }

        // 비동기작업이 완료되면 수행되는 hook 을 이용하여 결과에대해서 처리할 수 있도록 함
        @Override
        protected void done() {
            try {
                sc.onSuccess(get());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException e) {
                ec.onError(e.getCause());
            }
        }
    }

    public static void main(String[] args) throws Exception {
        ExecutorService es = Executors.newCachedThreadPool();

        CallbackFutureTask cbf = new CallbackFutureTask(() -> {
            Thread.sleep(2000);
            if(1 == 1) throw new RuntimeException("Async Error!!");
            log.info("Async");
            return "Hello";
        },
        result -> log.info(result),
        e -> log.info("Error: {}", e.getMessage()));

        es.execute(cbf);
        es.shutdown();
    }
}

 


3. In Spring

 

@EnableAsync / @Async 어노테이션을 이용하여 스프링에서 비동기처리가 가능하다.

  • 과거에는 SimpleAsyncTaskExecutor 가 사용됐으나, 현재는 ThreadPoolTaskExecutor 가 사용됨

 

@SpringBootApplication
@Slf4j
@EnableAsync
public class TobyApplication {

    @Component
    public static class MyService {
        @Async // 결과를 바로 가져올 수 없음 (Future)
        public Future<String> hello() throws InterruptedException {
            log.info("hello()");
            Thread.sleep(1000);
            return new AsyncResult<>( "Hello");
        }
    }
}

Future 대신 ListenableFuture 를 사용하여 콜백을 등록할 수 있다.

@Component
public static class MyService {

    @Async // 결과를 바로 가져올 수 없음
    public ListenableFuture<String> hello() throws InterruptedException {
        log.info("hello()");
        Thread.sleep(1000);
        return new AsyncResult<>( "Hello");
    }
}

---
usage

ListenableFuture<String> f = myService.hello();
f.addCallback(s -> System.out.println(s), e -> System.out.println(e.getMessage()));

ThreadPoolTaskExecutor

@Bean
ThreadPoolTaskExecutor tp() {
    ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
    te.setCorePoolSize(10); // 처음 쓰레드에 요청이 오는 경우 만드는 쓰레드풀의 기본 사이즈
    te.setQueueCapacity(200); // CorePoolSize 가 가득찬 경우에 쓰레드를 받기위해 대기하는 대기열
    te.setMaxPoolSize(100); // 최대 pool size (QueueCapacity 가 꽉 차면 최대까지 사이즈를 늘림)
    te.setKeepAliveSeconds(10); // 이시간동안 안쓰이면 쓰레드 제거
    te.setTaskDecorator(..); // 쓰레드를 새로만들거나 반환하는 시점에 앞뒤에 콜백을 걸어줄 수 있음
    te.setThreadNamePrefix("sjleeThread"); // 이름걸기
    te.initialize(); // 초기화 이후에 리턴하면 됨

    return te;
}

4. 비동기 서블릿

 

서블릿은 기본적으로 blocking 방식이다.

  • HttpRequest / HttpResponse 를 사용하여 처리하는 방식으로 구현되어있는데
  • HttpRequest / HttpResponse 가 Input, OutputStream을 사용하여 요청을 처리하는데 Input, OutputStream이 blocking IO이기 때문이다.
  • Blocking 이라면 지속적인 Context switching 으로 인한 손해 등등...


비동기 서블릿

  • 위의 블로킹방식과는 다르게, 작업이 blocking 이라면
    • 서블릿 쓰레드를 반환하고
    • Working Thread 가 작업을 끝낸 뒤 결과물을 반환할 때 콜백을 통해 응답을 처리하는 서블릿쓰레드를 요청하여 응답을 처리한다.
  • 하지만 worker Thread 의 개수는 많아질 것이다.


DeferredResult Queue

  • DeferredResult 로 결과를 담아놓고 있다가 결과를 반환함 (Working Thread 의 개수가 많아지는 문제 없음)
    • dr.setResult() 등 호출하면 결과를 반환

@RestController
public static class MyController {
    Queue<DeferredResult<String>>  results = new ConcurrentLinkedQueue<>();

    @GetMapping("/dr")
    public DeferredResult<String> dr() {
        log.info("dr");
        DeferredResult<String> dr = new DeferredResult<>(600000L);
        results.add(dr);
        return dr;
    }

    @GetMapping("/dr/event")
    public String drEvent(String msg) {
        for(DeferredResult<String> dr : results) {
            // 해당부분 호출시에 [/dr] 에 대한 결과 반환
            dr.setResult("Hello " + msg);
            results.remove(dr);
        }
        return "OK";
    }
}

emitter

  • 데이터를 여러번에 쪼개서 보내는 기술
@RestController
public static class MyController {
    // 여러번에 데이터를 쪼개서 보내는 기술
    @GetMapping("/emitter")
    public ResponseBodyEmitter emitter() {
        ResponseBodyEmitter emitter = new ResponseBodyEmitter();

        Executors.newSingleThreadExecutor().submit(() -> {
            try {
                for(int i = 1 ; i <= 50 ; i++) {
                    emitter.send("<p>Stream " + i + "</p>");
                    Thread.sleep(100);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        return emitter;
    }
}

'토비의봄' 카테고리의 다른 글

7. AsyncTemplate 의 콜백헬, 중복작업 문제  (0) 2021.03.08
6. 비동기 RestTemplate, 비동기 MVC/Servlet  (0) 2021.03.06
4.3. Reactive Streams  (0) 2021.03.03
4.2. Reactive Streams  (0) 2021.02.28
4.1. Reactive Streams  (0) 2021.02.28