1. Java Future
Future
- 비동기적인 연산에 대한 결과를 가지고있는 것
- 다른쓰레드에서 사용한 결과를 가져오는 가장 기본이되는 인터페이스이다.
// 쓰레드풀
- 쓰레드를 생성하고 없애는데에는 CPU 사용이 많다.
- 이를 없애지않고, 반납한 뒤에 재활용하여 비용이 많이 들어가는 것을 최소화하기위해 사용
@Slf4j
public class FutureEx {
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newCachedThreadPool();
// Runnable 인터페이스를 받음
Future<String> f = es.submit(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
});
log.info(f.get()); // Blocking Method
log.info("Exit");
}
}
----
09:15:47.327 [pool-1-thread-1] INFO tv.toby.future.FutureEx - Async
09:15:47.331 [main] INFO tv.toby.future.FutureEx - Hello
09:15:47.331 [main] INFO tv.toby.future.FutureEx - Exit
결과 출력을 보니
- future의 get()은, 비동기작업이 완료되기전 까지 메인쓰레드가 진행하지 못한다. [Blocking]
순서를 바꿔서 출력한다면 다른작업을 동시에 수행할 수 있으며,
- Future를 통해 다른쓰레드에서 수행한 결과값을 가져올 수 있다.
@Slf4j
public class FutureEx {
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newCachedThreadPool();
// Runnable 인터페이스를 받음
Future<String> f = es.submit(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
});
log.info("Exit");
log.info(f.get()); // Blocking
}
}
---
09:20:35.696 [main] INFO tv.toby.future.FutureEx - Exit
09:20:37.699 [pool-1-thread-1] INFO tv.toby.future.FutureEx - Async
09:20:37.699 [main] INFO tv.toby.future.FutureEx - Hello
2. 콜백
FutureTask
- Callable / Runnable 인터페이스를 생성자를 통해 받아, es.execute 로 수행 가능
- done() 메소드는 비동기작업이 끝나면 수행되는 hook
- get() 메소드는 비동기작업의 결과를 갖고있음
@Slf4j
public class FutureEx {
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newCachedThreadPool();
FutureTask<String> f = new FutureTask<String>(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
}) {
@SneakyThrows
@Override
// 비동기작업이 완료되면 수행되는 일종의 hook
protected void done() {
log.info(get());
}
};
es.execute(f);
es.shutdown();
}
}
위의 코드를 기반으로
- FutureTask 를 상속받고,
- FutureTask 이후에 수행되는 Interface를 이용하여
- FutureTask 가 수행되면 done() 메소드를 이용하여 결과값이 출력되도록 하는
- 또한 예외에 대해 처리하는 Interface를 추가하여 예외또한 처리하는 콜백을 만들자
@Slf4j
public class FutureEx {
interface SuccessCallback {
void onSuccess(String result);
}
interface ExceptionCallback {
void onError(Throwable t);
}
public static class CallbackFutureTask extends FutureTask<String> {
SuccessCallback sc;
ExceptionCallback ec;
public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
super(callable);
this.sc = Objects.requireNonNull(sc);
this.ec = Objects.requireNonNull(ec);
}
// 비동기작업이 완료되면 수행되는 hook 을 이용하여 결과에대해서 처리할 수 있도록 함
@Override
protected void done() {
try {
sc.onSuccess(get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
ec.onError(e.getCause());
}
}
}
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newCachedThreadPool();
CallbackFutureTask cbf = new CallbackFutureTask(() -> {
Thread.sleep(2000);
if(1 == 1) throw new RuntimeException("Async Error!!");
log.info("Async");
return "Hello";
},
result -> log.info(result),
e -> log.info("Error: {}", e.getMessage()));
es.execute(cbf);
es.shutdown();
}
}
3. In Spring
@EnableAsync / @Async 어노테이션을 이용하여 스프링에서 비동기처리가 가능하다.
- 과거에는 SimpleAsyncTaskExecutor 가 사용됐으나, 현재는 ThreadPoolTaskExecutor 가 사용됨
@SpringBootApplication
@Slf4j
@EnableAsync
public class TobyApplication {
@Component
public static class MyService {
@Async // 결과를 바로 가져올 수 없음 (Future)
public Future<String> hello() throws InterruptedException {
log.info("hello()");
Thread.sleep(1000);
return new AsyncResult<>( "Hello");
}
}
}
Future 대신 ListenableFuture 를 사용하여 콜백을 등록할 수 있다.
@Component
public static class MyService {
@Async // 결과를 바로 가져올 수 없음
public ListenableFuture<String> hello() throws InterruptedException {
log.info("hello()");
Thread.sleep(1000);
return new AsyncResult<>( "Hello");
}
}
---
usage
ListenableFuture<String> f = myService.hello();
f.addCallback(s -> System.out.println(s), e -> System.out.println(e.getMessage()));
ThreadPoolTaskExecutor
@Bean
ThreadPoolTaskExecutor tp() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(10); // 처음 쓰레드에 요청이 오는 경우 만드는 쓰레드풀의 기본 사이즈
te.setQueueCapacity(200); // CorePoolSize 가 가득찬 경우에 쓰레드를 받기위해 대기하는 대기열
te.setMaxPoolSize(100); // 최대 pool size (QueueCapacity 가 꽉 차면 최대까지 사이즈를 늘림)
te.setKeepAliveSeconds(10); // 이시간동안 안쓰이면 쓰레드 제거
te.setTaskDecorator(..); // 쓰레드를 새로만들거나 반환하는 시점에 앞뒤에 콜백을 걸어줄 수 있음
te.setThreadNamePrefix("sjleeThread"); // 이름걸기
te.initialize(); // 초기화 이후에 리턴하면 됨
return te;
}
4. 비동기 서블릿
서블릿은 기본적으로 blocking 방식이다.
- HttpRequest / HttpResponse 를 사용하여 처리하는 방식으로 구현되어있는데
- HttpRequest / HttpResponse 가 Input, OutputStream을 사용하여 요청을 처리하는데 Input, OutputStream이 blocking IO이기 때문이다.
- Blocking 이라면 지속적인 Context switching 으로 인한 손해 등등...
비동기 서블릿
- 위의 블로킹방식과는 다르게, 작업이 blocking 이라면
- 서블릿 쓰레드를 반환하고
- Working Thread 가 작업을 끝낸 뒤 결과물을 반환할 때 콜백을 통해 응답을 처리하는 서블릿쓰레드를 요청하여 응답을 처리한다.
- 하지만 worker Thread 의 개수는 많아질 것이다.
DeferredResult Queue
- DeferredResult 로 결과를 담아놓고 있다가 결과를 반환함 (Working Thread 의 개수가 많아지는 문제 없음)
- dr.setResult() 등 호출하면 결과를 반환
@RestController
public static class MyController {
Queue<DeferredResult<String>> results = new ConcurrentLinkedQueue<>();
@GetMapping("/dr")
public DeferredResult<String> dr() {
log.info("dr");
DeferredResult<String> dr = new DeferredResult<>(600000L);
results.add(dr);
return dr;
}
@GetMapping("/dr/event")
public String drEvent(String msg) {
for(DeferredResult<String> dr : results) {
// 해당부분 호출시에 [/dr] 에 대한 결과 반환
dr.setResult("Hello " + msg);
results.remove(dr);
}
return "OK";
}
}
emitter
- 데이터를 여러번에 쪼개서 보내는 기술
@RestController
public static class MyController {
// 여러번에 데이터를 쪼개서 보내는 기술
@GetMapping("/emitter")
public ResponseBodyEmitter emitter() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
Executors.newSingleThreadExecutor().submit(() -> {
try {
for(int i = 1 ; i <= 50 ; i++) {
emitter.send("<p>Stream " + i + "</p>");
Thread.sleep(100);
}
} catch (Exception e) {
e.printStackTrace();
}
});
return emitter;
}
}
'토비의봄' 카테고리의 다른 글
7. AsyncTemplate 의 콜백헬, 중복작업 문제 (0) | 2021.03.08 |
---|---|
6. 비동기 RestTemplate, 비동기 MVC/Servlet (0) | 2021.03.06 |
4.3. Reactive Streams (0) | 2021.03.03 |
4.2. Reactive Streams (0) | 2021.02.28 |
4.1. Reactive Streams (0) | 2021.02.28 |