✏️[모던 자바 인 액션, 전문가를 위한 자바 8, 9, 10 기법 가이드] 스터디 관련 책 내용을 정리한 글입니다.
📌 이 장의 내용
- Thread, Future, 자바가 풍부한 동시성 API를 제공하도록 강요하는 진화의 힘
- 비동기 API
- 동시 컴퓨팅의 박스와 채널 뷰
- CompletableFuture 콤비네이터로 박스를 동적으로 연결
- 리액티브 프로그래밍용 자바 9 플로 API의 기초를 이루는 발행 구독 프로토콜
- 리액티브 프로그래밍과 리액티브 시스템
최근 소프트웨어 개발 방법에서는 멀티코어 프로세서를 활용한 병렬 처리 기술과 마이크로 서비스 아키텍처, 매시업 형태의 애플리케이션 개발이 주목되고 있습니다. 멀티태스크 프로그래밍의 양면성이 중요한 역할을 하며, 자바에서는 Future 인터페이스와 CompletableFuture, 자바 9에서는 플로 API를 통해 비동기적인 작업 처리가 가능하다. 이를 통해 여러 웹 서비스에 접근하는 애플리케이션 개발이 가능해졌습니다.
15.1 동시성을 구현하는 자바 지원의 진화
자바의 동시 프로그래밍 지원은 하드웨어, 소프트웨어 시스템, 프로그래밍 콘셉트의 변화에 맞춰 진화해 왔습니다. 초기 자바는 Runnable과 Thread를 이용한 동기화된 클래스와 메서드로 동시성을 지원했으며, 자바 5부터는 ExecutorService 인터페이스, Callable <T>, <T> Future <T>, <T> 제네릭 등을 추가하여 멀티코어 CPU에서 병렬 프로그래밍을 쉽게 구현할 수 있게 되었습니다. 자바 7에서는 java.util.concurrent.RecursiveTask를 추가하고, 자바 8에서는 스트림과 람다를 이용한 병렬 프로세싱이 추가되었습니다. 자바는 Future를 조합하는 기능도 추가하면서 동시성을 강화했고, 자바 9에서는 분산 비동기 프로그래밍을 명시적으로 지원합니다. 이를 바탕으로 리액티브 프로그래밍이라는 모델과 툴킷을 제공하며, CompletableFuture와 java.util.concurrent.Flow를 이용해 멀티코어 또는 여러 기기를 통한 병렬성을 쉽게 이용하는 것을 목표로 합니다.
15.1.1 스레드와 높은 수준의 추상화
운영 체제에서 각 사용자에게 프로세스를 할당하고, 가상 주소 공간을 제공하여 CPU를 번갈아가며 할당함으로써 실제로 마술 같은 일이 일어납니다. 멀티 코어 설정에서는 스레드를 사용하지 않으면 여러 프로세서 코어 중 한 개만을 사용하여 효율성을 고려합니다. 실제로 네 개의 코어를 가진 CPU에서 프로그램을 네 개의 코어에서 병렬로 실행하면 실행 속도를 네 배까지 향상시킬 수 있습니다. 물론 오버헤드로 인해 실제 네 배가 되긴 어렵습니다.
다음은 학생들이 제출한 숫자 1,000,000개를 저장한 배열을 처리하는 구현 코드입니다.
// 아래 코드는 한 개의 코어로 며칠 동안 작업을 수행합니다.
long sum = 0;
for (int i = 0; i < 1_000_000; i++) {
sum += stats[i];
}
// 아래 코드는 첫 스레드를 다음 처럼 실행합니다.
long sum0 = 0;
for (int i = 0; i < 250_000; i++) {
sum0 += stats[i];
}
// 네 번째 스레드는 다음으로 끝납니다.
long sum3 = 0;
for (int i = 750_000; i < 1_000_000; i++) {
sum3 += stats[i];
}
// 메인 프로그램은 네 개의 스레드를 완성하고
// 자바의 .start()로 실행한 다음 .join()으로 완료될 때까지 기다렸다가 다음을 계산합니다.
sum = sum0 + ... + sum3;
7장에서는 자바의 병렬성 지원을 스트림을 이용한 내부 반복으로 간단하게 구현하는 방법을 설명했습니다. 이를 통해 스레드 사용 패턴을 추상화하고 불필요한 코드를 줄일 수 있습니다. 또한 자바 7의 java.util.concurrent.RecursiveTask를 이용해 배열 합을 효율적으로 계산하는 방법도 제공했습니다. 이를 통해 포크/조인 스레드 추상화로 분할 그리고 정복 알고리즘을 병렬화하는 높은 수준의 방식을 구현할 수 있습니다. 추가로 자바 5의 ExecutorService 개념과 스레드 풀을 살펴보겠습니다.
15.1.2 Executor와 스레드 풀
자바 5에서는 Executor 프레임워크와 스레드 풀을 도입하여, 자바 프로그래머가 태스크 제출과 실행을 분리하고 스레드의 힘을 높은 수준으로 활용할 수 있게 되었습니다.
스레드의 문제
자바 스레드는 운영체제 스레드와 직접 연결되어 있으며, 운영체제 스레드를 만들고 종료하는 비용이 크고, 운영체제 스레드 개수가 제한되어 있어 주의해야 한다. 일부 운영 체제 스레드가 블록되거나 자고 있는 상황에서 모든 하드웨어 스레드가 코드를 실행하도록 할당되는 경우도 있으므로 하드웨어 스레드 개수에 따라 최적의 자바 스레드 개수가 달라진다.
스레드 풀 그리고 스레드 풀이 더 좋은 이유
자바 ExecutorService는 태스크를 제출하고 결과를 수집할 수 있는 인터페이스를 제공하며, newFixedThreadPool과 같은 팩토리 메서드를 이용해 스레드 풀을 만들어 사용할 수 있습니다.
ExecutorService newFixedThreadPool(int nThreads)
newFixedThreadPool 메서드는 워커 스레드 nThreads를 포함하는 ExecutorService를 만들고 스레드 풀에 저장합니다. 스레드 풀에서 사용하지 않은 스레드로 제출된 태스크를 먼저 실행하고, 실행이 종료되면 이들 스레드를 풀로 반환합니다. 프로그래머는 태스크를 제출하면 ExecutorService가 이를 실행합니다. 이 방식은 하드웨어에 맞는 수의 태스크를 유지하면서 수 천 개의 태스크를 아무 오버헤드 없이 제출할 수 있습니다.
스레드 풀 그리고 스레드 풀이 나쁜 이유
스레드 풀을 이용하는 것이 스레드를 직접 사용하는 것보다 바람직하지만, 두 가지 "사항"을 주의해야 합니다.
- 스레드 풀은 제한된 수의 스레드만 동시에 실행 가능하므로 블록 상황에서는 주의해야 합니다. 블록할 수 있는 태스크는 스레드 풀에 제출하지 말아야 하며, 데드락이 발생할 수 있기 때문에 항상 이를 지킬 수 있는 것은 아닙니다.
- 자바 프로그램에서 중요한 코드를 실행하는 스레드가 죽지 않도록 하기 위해, 보통 main이 반환하기 전에 모든 스레드의 작업이 끝나길 기다리고, 모든 스레드 풀을 종료하는 습관을 갖는 것이 중요하다. ExecutorService를 이용해 장기간 실행하는 인터넷 서비스를 관리하는 경우, Thread.setDaemon 메서드를 사용해 이러한 상황을 다룰 수 있다.
15.1.3 스레드의 다른 추상화 : 중첩되지 않은 메서드 호출
7장에서 설명한 동시성과 현재 설명하는 동시성의 차이는 엄격한 포크/조인과 외부 호출에서 종료되도록 기다리는 덜 엄격한 포크/조인의 차이입니다. 엄격한 포크/조인은 메서드 호출 내에서 태스크나 스레드가 시작되면 해당 메서드 호출이 반환하기 전에 작업이 끝나기를 기다렸지만, 외부 호출에서는 좀 더 여유로운 방식으로 작업을 실행합니다. 이러한 차이 때문에 덜 엄격한 포크/조인을 사용해도 비교적 안전하게 동시성 작업을 수행할 수 있습니다. 덜 엄격한 포크/조인은 사용자의 메서드 호출에 의해 스레드가 생성되고 메서드를 벗어나 계속 실행되는 비동기 메서드를 포함합니다. 이 장에서는 이러한 비동기 메서드를 활용하는 방법에 대해 다룹니다.
이들 메서드를 사용할 때 다음과 같은 위험성이 따릅니다.
- 스레드 실행은 메서드를 호출한 다음의 코드와 동시에 실행되므로 데이터 경쟁 문제를 일으키지 않도록 주의해야 합니다.
- 자바의 main() 메서드가 반환되기 전에 모든 스레드의 작업이 끝나도록 보장해야 합니다. 그렇지 않으면 두 가지 안전하지 못한 상황이 발생할 수 있습니다.
- 애플리케이션을 종료하지 못하고 모든 스레드가 실행을 끝낼 때까지 기다린다.
- 애플리케이션 종료를 방해하는 스레드를 강제종료시키고 애플리케이션을 종료한다.
자바 애플리케이션에서 모든 스레드를 추적하고 애플리케이션을 종료하기 전에 스레드 풀을 포함한 모든 스레드를 종료해야 합니다. 스레드는 setDaemon() 메서드를 이용해 데몬 또는 비데몬으로 구분할 수 있으며, 데몬 스레드는 애플리케이션이 종료될 때 강제 종료되므로 데이터 일관성을 파괴하지 않는 동작을 수행할 때 유용합니다. main() 메서드는 모든 비데몬 스레드가 종료될 때까지 프로그램을 종료하지 않고 기다립니다.
15.1.4 스레드에 무엇을 바라는가?
프로그램을 작은 태스크 단위로 구조화하여 모든 하드웨어 스레드를 활용해 병렬성의 장점을 극대화하는 것이 목표입니다. 이를 위해 7장에서는 병렬 스트림 처리와 포크/조인, 분할 정복 알고리즘을 다루었다.
15.2 동기 API와 비동기 API
7장에서는 자바 8 스트림을 이용하여 명시적으로 병렬 하드웨어를 이용할 수 있는 방법을 설명하였습니다. 이를 위해서는 외부 반복을 내부 반복으로 바꾸고 parallel() 메서드를 이용하여 요소를 병렬 처리해야 합니다. 또한, 내부 반복은 루프 기반의 계산을 제외한 다른 상황에서도 병렬성을 이용할 수 있는 장점이 있습니다. 이러한 병렬성을 이용하여 비동기 API를 구현하는 중요한 자바 개발 배경도 15장, 16장, 17장에서 다루고 있습니다.
다음과 같은 시그니처를 갖는 f, g 두 메서드의 호출을 합치는 예제를 구현하면 다음과 같습니다.
int f(int x);
int g(int x);
참고로 이들 메서드는 물리적 결과를 반환하므로 동기 API라 부릅니다. 다음처럼 두 메서드를 호출하고 합계를 출력하는 코드가 있습니다.
int y = f(x);
int z = g(x);
System.out.println(y + z);
f와 g를 실행하는 데 오랜 시간이 걸리는 경우, 자바 컴파일러가 코드 최적화를 수행하지 않을 수 있습니다. f와 g가 서로 상호작용하지 않거나 이에 대해 신경 쓰지 않는다면, 별도의 CPU 코어로 f와 g를 실행하여 시간을 단축할 수 있습니다. 그러나 이를 구현하면 다음처럼 코드가 복잡해지게 됩니다.
class ThreadExample {
public static void main(String[] args) throws InterruptedException {
int x = 1337;
Result result = new Result();
Thread t1 = new Thread() -> { result.left = f(x); } );
Thread t2 = new Thread() -> { result.right = g(x); } );
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(result.left + result.right);
}
private static class Result {
private int left;
private int right;
}
}
Future API 인터페이스를 사용하여 Runnable 대신 코드를 더 간단하게 구현할 수 있습니다. 이미 ExecutorService로 스레드 풀을 설정한 상황에서는, 위 기능을 다음과 같은 코드로 구현할 수 있습니다.
public class ExecutorServiceExample {
public static void main(String[] args)
throws ExecutionException, InterruptedExcetpion {
int x = 1337;
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<Integer> y = executorService.submit(() -> f(x));
Future<Integer> z = executorService.submit(() -> g(x));
System.out.println(y.get() + z.get());
executorService.shutdown();
}
}
코드의 불필요한 부분을 제거하고 명시적인 submit 메서드 호출을 없애기 위해 비동기 API를 이용할 수 있습니다. 첫 번째 방법인 자바 Future와 CompletableFuture를 이용하면 문제를 개선할 수 있습니다. 두 번째 방법인 java.util.concurrent.Flow 인터페이스를 이용하는 방법도 있습니다. 이러한 대안을 적용하면 f와 g의 시그니처가 어떻게 바뀔지 고민해봐야 합니다.
15.2.1 Future 형식 API
대안을 이용하면 f, g의 시그니처가 다음처럼 바뀝니다.
Future<Integer> f(int x);
Future<Integer> g(int x);
그리고 다음처럼 호출이 바뀝니다.
Future<Integer> y = f(x);
Future<Integer> z = g(x);
System.out.println(y.get() + z.get());
위 예제에서는 f와 g 메서드가 Future를 반환하며, get() 메서드를 통해 두 Future의 결과를 합쳐 반환하는 방식이 사용되었습니다. 그러나 이 방식은 두 가지 이유로 대규모 프로그램에서는 사용되지 않습니다.
- 다른 상황에서는 g에도 Future 형식이 필요할 수 있으므로 API 형식을 통일하는 것이 바람직합니다.
- 병렬 하드웨어로 프로그램 실행 속도를 극대화하려면 여러 작은 하지만 합리적인 크기의 태스크로 나눈 것이 좋습니다.
15.2.2 리액티브 형식 API
두 번째 대안에서 핵심은 f, g의 시그니처를 바꿔서 콜백 형식의 프로그래밍을 이용하는 것입니다.
void f(int x, IntConsumer dealWithResult);
f가 값을 반환하지 않는 경우에도 프로그램이 동작하도록 하기 위해서는 f에 추가 인수로 콜백(람다)을 전달하여 결과가 준비되면 이를 람다로 호출하는 태스크를 만들어야 합니다. 이를 통해 f는 바디를 실행하면서 태스크를 만들고 즉시 반환하게 되어, 코드 형식이 다음과 같이 변경됩니다.
public class CallbackStyleExample {
public static void main(String[] args) {
int x = 1337;
Result result = new Result();
f(x, (int y) -> {
result.left = y;
System.out.println((result.left + result.right));
});
g(x, (int z) -> {
result.right = z;
System.out.println((result.left + result.right));
});
}
}
결과가 달라져서 f와 g의 호출 합계를 정확하게 출력하지 못하고 상황에 따라 먼저 계산된 결과를 출력하는 문제가 발생하였습니다. 이 문제는 락을 사용하지 않아 값을 두 번 출력할 수 있을뿐더러, 때로는 +에 제공된 두 피연산자가 println이 호출되기 전에 업데이트될 수 있다는 것이 원인입니다. 다음처럼 두 가지 방법으로 이 문제를 보완할 수 있습니다.
- if-then-else를 이용해 적절한 락을 이용해 두 콜백이 모두 호출되었는지 확인한 다음 println을 호출해 원하는 기능을 수행할 수 있다.
- 리액티브 형식 API는 보통 한 결과가 아니라 일련의 이벤트에 반응하도록 설계되었으므로 Future를 이용하는 것이 더 적절하다.
리액티브 형식의 프로그래밍에서는 메서드 f와 g가 dealWithResult 콜백을 여러 번 호출할 수 있으며, Future 형식의 API는 일회성의 값을 처리하는 데 적합합니다. API 선택은 코드를 더 복잡하게 만들 수 있지만, 사용하는 코드를 더 간단하고 높은 수준의 구조를 유지할 수 있도록 도와줍니다. 또한, 오래 걸리는 계산이나 네트워크/사용자 입력을 기다리는 메서드에서 이들 API를 잘 활용하면 애플리케이션의 효율성을 크게 향상시킬 수 있습니다. 이러한 API는 하단의 시스템을 효율적으로 활용하고 리소스를 낭비하지 않는 장점도 제공합니다. 이 내용은 다음 절에서 자세히 다룹니다.
15.2.3 잠자기(그리고 기타 블로킹 동작)는 해로운 것으로 간주
sleep() 메서드를 사용해 스레드를 잠들게 할 경우, 스레드는 여전히 시스템 자원을 점유하므로, 스레드 풀에서 잠자는 스레드는 다른 태스크가 시작되지 못하게 막는 문제가 발생할 수 있습니다. 이를 해결하기 위해, 기다리는 일을 만들지 않거나 예외를 일으키는 방법으로 처리할 수 있습니다. 또한, 태스크를 앞과 뒤로 나누고 블록되지 않을 때만 뒷부분을 자바가 스케쥴링하도록 요청할 수도 있습니다.
다음은 한 개의 작업을 갖는 코드 A입니다.
work1();
Thread.sleep(10000); // 10초동안 잠
work2();
코드 B와 비교하면 다음과 같습니다.
public class ScheduledExecutorServiceExample {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService
= Executors.newScheduledThreadPool(1);
work1();
scheduledExecutorService.schedule(
// work1()이 끝난 다음 10초 뒤에 work2()를 개별 태스크로 스케쥴함
ScheduledExecutorServiceExample::Work2, 10, TimeUnit.SECONDS);
scheduledExecutorService.shutdown();
)
public static void work1() {
System.out.println("Hello from Work1!");
}
public static void work2() {
System.out.println("Hello form Work2!");
}
}
}
코드 A와 B는 둘 다 스레드 풀에서 실행되지만, A는 자는 동안 귀중한 스레드 자원을 점유하고, B는 다른 작업이 실행될 수 있도록 허용합니다. 태스크를 만들 때는 이런 특징을 잘 활용해야 하며, 태스크를 블록하는 것보다는 다음 작업을 태스크로 제출하고 현재 태스크를 종료하는 것이 바람직합니다. 이를 따르면 스레드 제한이 있을 때 효율적인 코드를 작성할 수 있습니다. 스레드에 제한이 없고 저렴하다면 코드 A와 B는 사실상 같지만, 스레드에는 제한이 있고 저렴하지 않으므로 가능하면 코드 B의 형식을 따르는 것이 좋습니다.
15.2.4 현실성 확인
새로운 시스템을 설계할 때는 가능한 많은 작은 동시 실행 태스크를 사용하여 블록하는 동작을 비동기 호출로 구현하는 것이 좋지만, 이 원칙을 모든 API에 적용하기는 어렵습니다. 자바에서는 2002년부터 비블록 I/O 기능을 제공하고 있으며, 개선된 동시성 API를 사용하여 성능을 향상시킬 수 있는 상황을 찾아보는 것이 좋습니다. 또한 Netty와 같은 새로운 라이브러리를 사용하면 네트워크 서버의 블록/비블록 API를 일관적으로 제공할 수 있습니다.
15.2.5 비동기 API에서 예외는 어떻게 처리하는가?
비동기 API에서 호출된 메서드의 바디는 별도의 스레드에서 실행되며, 호출자의 실행 범위와는 관계 없는 상황에서 예상치 못한 예외가 발생할 수 있습니다. 이 경우, CompletableFuture에서는 get() 메서드에 예외를 처리할 수 있는 기능을 제공하고, exceptionally() 메서드를 사용하여 회복할 수 있습니다. 반면, 리액티브 형식의 비동기 API에서는 return이 아닌 기존 콜백이 호출되므로, 예외가 발생하면 실행될 추가 콜백을 만들어 인터페이스를 변경해야 합니다. 다음 예제처럼 리액티브 API에는 여러 콜백이 포함해야 합니다.
void f(int x, Consumer<Integer> dealWithResult,
Consumer<Throwable> dealWithException);
// f의 바디는 다음을 수행할 수 있습니다.
deaWithException(e);
여러 개의 콜백을 제공하는 대신, 한 객체로 이를 감싸는 것이 좋습니다. 예를 들어 자바 9의 플로 API에서는 여러 콜백을 한 객체로 감싸는 Subscriber <T><T> 클래스를 제공합니다. 이를 통해 각각 대표하는 네 개의 콜백을 포함할 수 있습니다. 다음은 그 예제입니다.
void onComplete();
void onError(Throwable throwable);
void onNext(T item);
값이 있을 때(onNext), 도중에 에러가 발생했을 때(onError), 값을 다 소진했거나 에러가 발생해서 더 이상 처리할 데이터가 없을 때(onComplete) 각각의 콜백이 호출되며, 이를 적용하면 다음처럼 f의 시그니처가 바뀝니다.
void f(int x, Subscriber<Integer> s);
// f의 바디는 다음처럼 Throwable를 가리키는 t로 예외가 일어났음을 가리킵니다.
s.onError(t);
여러 콜백을 포함하는 API를 파일이나 키보드 장치에서 숫자를 읽는 작업과 비교하면, 이들 장치가 수동적인 데이터 구조체가 아니라 일련의 이벤트를 만들어냅니다. API에서 이벤트는 메시지나 이벤트로 불립니다. API는 이벤트의 순서를 전혀 고려하지 않으며, 일반적으로 프로토콜을 부속 문서에서 정의하여 "onComplete 이벤트 다음에는 아무 이벤트도 일어나지 않음"과 같은 구문을 사용합니다.
15.3 박스와 채널 모델
동시성 모델을 설계하고 개념화하기 위해서는 그림이 필요합니다. 이를 위해 박스와 채널 모델을 사용합니다. 예를 들어, 정수와 관련된 간단한 상황에서 f(x)와 g(x)를 호출하고 p 함수에 인수 x를 이용해 호출하며 그 결과를 q1과 q2에 전달하고, 이 두 호출의 결과로 함수 r을 호출한 다음 결과를 출력하는 일련의 태스크를 그림으로 표현할 수 있습니다. 이를 위해 클래스 C의 메서드와 연산 함수 C::m을 구분하지 않습니다. 아래 그림과 같이 각 태스크는 박스로 표현되며, 박스는 각각의 입력과 출력 채널을 가지고 있습니다. 이러한 박스들 사이를 이어주는 채널을 통해 데이터가 전달되며, 이를 통해 각 태스크는 동시에 실행될 수 있습니다.
위 그림을 자바로 두 가지 방법으로 구현하고 어떤 문제가 있는지 확인합니다. 다음은 첫 번째 구현 방법입니다.
int t = p(x);
System.out.println( r(q1(t), q2(t)) );
위의 첫 번째 구현 방법은 겉보기엔 깔끔해 보이지만, 자바에서 q1, q2를 차례로 호출하는 것은 하드웨어 병렬성의 활용과 거리가 멉니다.
Future를 이용해 f, g를 병렬로 평가하는 방법도 있습니다.
int t = p(x);
Future<Integer> a1 = executorService.submit(() -> q1(t));
Future<Integer> a2 = executorService.submit(() -> q2(t));
System.out.println( r(a1.get(), a2.get()) );
위 예제에서는 p와 r 함수를 Future로 감싸지 않았지만, 코드로 구현하면 위 예제에서 원하는 작업과 거리가 있습니다.
위 예제에서는 p와 r 함수를 Future로 감싸지 않았으며, 아래처럼 코드를 흉내 내어 구현하면 실행 속도를 늦출 수 있고, 병렬성을 저해할 수
있어서 원하는 작업과 거리가 있습니다.
System.out.println( r(q1(t), q2(t)) + s(x) );
위 코드의 병렬성을 극대화하려면 모든 다섯 함수(p, q2, q2, r, s)를 Future로 감싸야하기 때문입니다.
시스템에서 많은 작업이 동시에 실행되고 있을 때, 위에서 구현한 방법은 병렬성을 제대로 활용하지 못하고, 데드락 등의 문제가 발생할 수 있습니다. 이러한 문제를 해결하기 위해 자바 8에서는 CompletableFuture와 콤비네이터를 이용해 다른 Function을 얻을 수 있는 compose(), andThen() 등의 메서드를 제공합니다. 이를 이용하면 add1 함수와 double 함수를 조합하여 인수를 두 배로 만들고 결과에 2를 더하는 Function을 구현할 수 있습니다.
Function<Integer, Integer> myfun = add1.andThen(double);
박스와 채널 다이어그램을 이용해 위 그림을 구현하는 것은 자바 Function과 BiFunction을 이용해 가능합니다. p, q1, q2, r 함수를 각각 Function과 BiFunction으로 구현하고, compose()와 andThen() 메서드를 이용해 이를 조합할 수 있습니다.
p.thenBoth(q1, q2).thenCombine(r)
15.4절에서는 CompletableFuture와 콤비네이터를 이용해 작업을 구조화하고, get()을 이용해 태스크가 기다리게 만드는 일을 피할 수 있는 방법을 설명합니다. 박스와 채널 모델은 대규모 시스템 구현의 추상화 수준을 높여주며, 콤비네이터를 이용해 내부적으로 작업을 처리하는 관점으로 바꿔줍니다. 마찬가지로 자바 8 스트림은 자료 구조를 반복해야 하는 코드를 내부적으로 작업을 처리하는 스팀 콤비네이터로 바꿔줍니다. 마지막으로 15.5절에서는 박스와 채널 다이어그램의 각 채널을 마블 다이어그램으로 표현하는 방법을 설명합니다.
15.4 CompletableFuture와 콤비네이터를 이용한 동시성
Future 인터페이스는 동시 코딩 작업을 처리하기 위한 인터페이스이지만, 역사적으로 FutureTask 구현을 넘어서는 다양한 동작을 제공했습니다. 자바 8에서는 CompletableFuture를 도입하여 Future를 더욱 구조화하고, 조합할 수 있는 기능을 추가했습니다. CompletableFuture는 실행할 코드 없이 Future를 만들 수 있고, complete() 메서드를 통해 값을 이용해 다른 스레드가 완료할 수 있도록 허용합니다. 이를 통해 f(x)와 g(x)를 동시에 실행해 합계를 구하는 코드를 구현할 수 있습니다. 이러한 이유로 CompletableFuture라고 부르게 되었습니다.
public class CFComplete {
public static void main(String[] args)
throws ExecutorException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
int x = 1337;
CompletableFuture<Integer> a = new CompletableFuture<>();
executorService.submit(() -> a.complete(f(x)));
int b = g(x);
System.out.println(a.get() + b);
executorService.shutdown();
}
}
또는 다음처럼 구현할 수 있습니다.
public class CFComplete {
public static void main(String[] args)
throws ExecutorException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
int x = 1337;
CompletableFuture<Integer> a = new CompletableFuture<>();
executorService.submit(() -> b.complete(g(x)));
int a = f(x);
System.out.println(a + b.get());
executorService.shutdown();
}
}
위 코드에서는 f(x) 또는 g(x)의 실행이 끝나기 전에 get()을 호출하여 프로세싱 자원을 낭비할 가능성이 있습니다. 하지만 자바 8의 CompletableFuture를 사용하면 이러한 문제를 해결할 수 있습니다.
자바에서 동작을 조합하는 것은 강력한 프로그래밍 구조 사상이지만, 람다식이 도입되기 전까지는 이 기능이 많이 사용되지 않았습니다. 자바 8에서 추가된 람다식을 사용하여 동작을 조합하는 것이 가능해졌고, 이를 이용하여 다음 예제처럼 스트림에 연산을 조합하는 것도 가능해졌습니다.
myStream.map(...).filter(...).sun()
Function 인터페이스에 compose(), andThen() 메서드를 사용하여 다른 Function을 얻을 수 있습니다. CompletableFuture <T>에는<T> thenCombine() 메서드를 사용하여 두 연산 결과를 더 효과적으로 더할 수 있습니다. thenCombine() 메서드는 다음과 같은 제네릭과 와일드카드와 관련된 문제를 간소화하기 위해 특정 시그니처를 갖고 있습니다.
CompletableFuture<V> thenCombine(CompletableFuture<U> other,
BiFunction<T, U, V> fn)
thenCombine() 메서드는 두 개의 CompletableFuture 값(T, U 결과 형식)을 받아한 개의 새 값을 만들며, 두 작업이 끝나면 두 결과 모두에 fn을 적용하고 블록 하지 않은 상태로 결과 Future를 반환합니다. 이전 코드에서 이 메서드를 사용하여 f(x)와 g(x)를 동시에 실행하고 합계를 구하는 코드를 더 간결하게 구현할 수 있습니다.
public class CFCombine {
public static void main(String[] args) throws ExecutionException,
InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
int x = 1337;
CompletableFuture<Integer> a = new CompletableFuture<>();
CompletableFuture<Integer> a = new CompletableFuture<>();
CompletableFuture<Integer> c = a.thenCombine(b, (y, z) -> y + z);
executorService.submit(() -> a.complete(f(x)));
executorService.submit(() -> b.complete(g(x)));
System.out.println(c.get());
executorService.shutdown();
}
}
thenCombine() 메서드를 사용하면 Future a와 Future b의 결과를 모르는 상태에서 두 연산이 모두 끝날 때까지 블록하지 않고 결과를 추가하는 세 번째 연산 c를 만들 수 있습니다. 이전 버전의 코드에서 발생했던 블록 문제를 해결할 수 있으며, 실제 필요한 스레드는 한 개지만 스레드 풀에서 두 개의 스레드가 여전히 활성 상태입니다. 일부 상황에서는 get()을 기다리는 스레드가 큰 문제가 되지 않을 수 있지만, 여러 질의를 처리해야 하는 상황에서는 CompletableFuture와 콤비네이터를 사용하여 get()에서 블록하지 않고 병렬 실행의 효율성을 높이고 데드락을 피하는 최상의 해결책을 구현할 수 있습니다.
15.5 발행-구독 그리고 리액티브 프로그래밍
15.5.1 두 플로를 합치는 예제
15.5.2 역압력
15.5.3 실제 역압력의 간단한 형태
15.6 리액티브 시스템 vs 리액티브 프로그래밍
'📚 서적 및 학습자료 > 모던 자바 인 액션[2회독]' 카테고리의 다른 글
[모던 자바 인 액션] 17장 리액티브 프로그래밍 (0) | 2023.04.02 |
---|---|
[모던 자바 인 액션] 16장 CompletableFuture : 안정적 비동기 프로그래밍 (0) | 2023.04.02 |
[모던 자바 인 액션] 14장 자바 모듈 시스템 (0) | 2023.03.27 |
[모던 자바 인 액션] 13장 디폴트 메서드 (0) | 2023.03.26 |
[모던 자바 인 액션] 12장 새로운 날짜와 시간 API (0) | 2023.03.26 |