CompletableFuture thenCompose VS thenComposeAsync
Основываясь на разных постах и статьях, я знаю только то, что: thenCompose будет работать в том же потоке с предыдущим этапом, тогда как thenComposeAsync попытается запустить новый поток по сравнению с предыдущим этапом.
Даже Java 8 в действии предоставляет следующий код, чтобы продемонстрировать, что thenCompose будет работать в том же потоке.
Цитата из Java 8 в действии Глава 11.4.3
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
Но я попробовал следующий код, результат меня довольно смутил, что, скорее всего, противоречит тому, что thenCompose запустит новый поток, тогда как thenComposeAsync будет повторно использовать старый поток.
public static void main(String... args) {
testBasic();
testAsync();
}
private static void testBasic() {
out.println("*****************************************");
out.println("********** TESTING thenCompose **********");
ExecutorService executorService = Executors.newCachedThreadPool();
CompletableFuture[] futures = Stream.iterate(0, i -> i+1)
.limit(3)
.map(i -> CompletableFuture.supplyAsync(() -> runStage1(i), executorService))
.map(future -> future.thenCompose(i -> CompletableFuture.supplyAsync(() -> runStage2(i), executorService)))
.map(f -> f.thenAccept(out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
}
private static void testAsync() {
out.println("*****************************************");
out.println("******* TESTING thenComposeAsync ********");
ExecutorService executorService = Executors.newCachedThreadPool();
CompletableFuture[] futures = Stream.iterate(0, i -> i+1)
.limit(3)
.map(i -> CompletableFuture.supplyAsync(() -> runStage1(i), executorService))
.map(future -> future.thenComposeAsync(i ->
CompletableFuture.supplyAsync(() -> runStage2(i), executorService)))
.map(f -> f.thenAccept(out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
}
private static Integer runStage1(int a) {
String s = String.format("Start: stage - 1 - value: %d - thread name: %s",
a, Thread.currentThread().getName());
out.println(s);
Long start = System.currentTimeMillis();
try {
Thread.sleep(1500 + Math.abs(new Random().nextInt()) % 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
s = String.format("Finish: stage - 1 - value: %d - thread name: %s - time cost: %d",
a, Thread.currentThread().getName(), (System.currentTimeMillis() - start));
out.println(s);
return Integer.valueOf(a % 3);
}
private static Integer runStage2(int b) {
String s = String.format("Start: stage - 2 - value: %d - thread name: %s",
b, Thread.currentThread().getName());
out.println(s);
Long start = System.currentTimeMillis();
try {
Thread.sleep(200 + Math.abs(new Random().nextInt()) % 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
s = String.format("Finish: stage - 2 - value: %d - thread name: %s - time cost: %d",
b, Thread.currentThread().getName(), (System.currentTimeMillis() - start));
out.println(s);
return Integer.valueOf(b);
}
И результат таков:
*****************************************
********** TESTING thenCompose **********
Start: stage - 1 - value: 1 - thread name: pool-1-thread-2
Start: stage - 1 - value: 0 - thread name: pool-1-thread-1
Start: stage - 1 - value: 2 - thread name: pool-1-thread-3
Finish: stage - 1 - value: 1 - thread name: pool-1-thread-2 - time cost: 1571
Start: stage - 2 - value: 1 - thread name: pool-1-thread-4 // using a new thread?????
Finish: stage - 1 - value: 2 - thread name: pool-1-thread-3 - time cost: 1875
Start: stage - 2 - value: 2 - thread name: pool-1-thread-2
Finish: stage - 1 - value: 0 - thread name: pool-1-thread-1 - time cost: 2198
Start: stage - 2 - value: 0 - thread name: pool-1-thread-3
Finish: stage - 2 - value: 2 - thread name: pool-1-thread-2 - time cost: 442
2
Finish: stage - 2 - value: 1 - thread name: pool-1-thread-4 - time cost: 779
1
Finish: stage - 2 - value: 0 - thread name: pool-1-thread-3 - time cost: 1157
0
*****************************************
******* TESTING thenComposeAsync ********
Start: stage - 1 - value: 0 - thread name: pool-2-thread-1 // all in same thread
Start: stage - 1 - value: 1 - thread name: pool-2-thread-2
Start: stage - 1 - value: 2 - thread name: pool-2-thread-3
Finish: stage - 1 - value: 0 - thread name: pool-2-thread-1 - time cost: 1623
Start: stage - 2 - value: 0 - thread name: pool-2-thread-1
Finish: stage - 1 - value: 1 - thread name: pool-2-thread-2 - time cost: 1921
Start: stage - 2 - value: 1 - thread name: pool-2-thread-2
Finish: stage - 1 - value: 2 - thread name: pool-2-thread-3 - time cost: 1932
Start: stage - 2 - value: 2 - thread name: pool-2-thread-3
Finish: stage - 2 - value: 0 - thread name: pool-2-thread-1 - time cost: 950
0
Finish: stage - 2 - value: 2 - thread name: pool-2-thread-3 - time cost: 678
2
Finish: stage - 2 - value: 1 - thread name: pool-2-thread-2 - time cost: 956
1
Что-то не так в демо? Пожалуйста, поправьте меня или, если есть какие-то полезные ресурсы, поделитесь пожалуйста.