CompletableFuture thenCompose VS thenComposeAsync

Основываясь на разных постах и ​​статьях, я знаю только то, что: thenCompose будет работать в том же потоке с предыдущим этапом, тогда как thenComposeAsync попытается запустить новый поток по сравнению с предыдущим этапом.

Даже Java 8 в действии предоставляет следующий код, чтобы продемонстрировать, что thenCompose будет работать в том же потоке.

Цитата из Java 8 в действии Глава 11.4.3

public List<String> findPrices(String product) {
    List<CompletableFuture<String>> priceFutures =
        shops.stream()
             .map(shop -> CompletableFuture.supplyAsync(    
                              () -> shop.getPrice(product), executor))
             .map(future -> future.thenApply(Quote::parse)) 
             .map(future -> future.thenCompose(quote -> 
                         CompletableFuture.supplyAsync(
                           () -> Discount.applyDiscount(quote), executor)))
                .collect(toList());

    return priceFutures.stream()
            .map(CompletableFuture::join)   
            .collect(toList());
}

Но я попробовал следующий код, результат меня довольно смутил, что, скорее всего, противоречит тому, что thenCompose запустит новый поток, тогда как thenComposeAsync будет повторно использовать старый поток.

    public static void main(String... args) {
        testBasic();
        testAsync();
    }
    private static void testBasic() {
        out.println("*****************************************");
        out.println("********** TESTING thenCompose **********");
        ExecutorService executorService = Executors.newCachedThreadPool();
        CompletableFuture[] futures = Stream.iterate(0, i -> i+1)
                .limit(3)
                .map(i -> CompletableFuture.supplyAsync(() -> runStage1(i), executorService))
                .map(future -> future.thenCompose(i -> CompletableFuture.supplyAsync(() -> runStage2(i), executorService)))
                .map(f -> f.thenAccept(out::println))
                .toArray(size -> new CompletableFuture[size]);
        CompletableFuture.allOf(futures).join();
    }

    private static void testAsync() {
        out.println("*****************************************");
        out.println("******* TESTING thenComposeAsync ********");
        ExecutorService executorService = Executors.newCachedThreadPool();
        CompletableFuture[] futures = Stream.iterate(0, i -> i+1)
                .limit(3)
                .map(i -> CompletableFuture.supplyAsync(() -> runStage1(i), executorService))
                .map(future -> future.thenComposeAsync(i ->
                        CompletableFuture.supplyAsync(() -> runStage2(i), executorService)))
                .map(f -> f.thenAccept(out::println))
                .toArray(size -> new CompletableFuture[size]);
        CompletableFuture.allOf(futures).join();
    }

    private static Integer runStage1(int a) {
        String s = String.format("Start: stage - 1 - value: %d - thread name: %s",
                a, Thread.currentThread().getName());
        out.println(s);
        Long start = System.currentTimeMillis();
        try {
            Thread.sleep(1500 + Math.abs(new Random().nextInt()) % 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        s = String.format("Finish: stage - 1 - value: %d - thread name: %s - time cost: %d",
                a, Thread.currentThread().getName(), (System.currentTimeMillis() - start));
        out.println(s);
        return Integer.valueOf(a % 3);
    }

    private static Integer runStage2(int b) {
        String s = String.format("Start: stage - 2 - value: %d - thread name: %s",
                b, Thread.currentThread().getName());
        out.println(s);
        Long start = System.currentTimeMillis();
        try {
            Thread.sleep(200 + Math.abs(new Random().nextInt()) % 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        s = String.format("Finish: stage - 2 - value: %d - thread name: %s - time cost: %d",
                b, Thread.currentThread().getName(), (System.currentTimeMillis() - start));
        out.println(s);
        return Integer.valueOf(b);
    }

И результат таков:

*****************************************
********** TESTING thenCompose **********
Start: stage - 1 - value: 1 - thread name: pool-1-thread-2
Start: stage - 1 - value: 0 - thread name: pool-1-thread-1
Start: stage - 1 - value: 2 - thread name: pool-1-thread-3
Finish: stage - 1 - value: 1 - thread name: pool-1-thread-2 - time cost: 1571
Start: stage - 2 - value: 1 - thread name: pool-1-thread-4  // using a new thread?????
Finish: stage - 1 - value: 2 - thread name: pool-1-thread-3 - time cost: 1875
Start: stage - 2 - value: 2 - thread name: pool-1-thread-2
Finish: stage - 1 - value: 0 - thread name: pool-1-thread-1 - time cost: 2198
Start: stage - 2 - value: 0 - thread name: pool-1-thread-3
Finish: stage - 2 - value: 2 - thread name: pool-1-thread-2 - time cost: 442
2
Finish: stage - 2 - value: 1 - thread name: pool-1-thread-4 - time cost: 779
1
Finish: stage - 2 - value: 0 - thread name: pool-1-thread-3 - time cost: 1157
0
*****************************************
******* TESTING thenComposeAsync ********
Start: stage - 1 - value: 0 - thread name: pool-2-thread-1 // all in same thread
Start: stage - 1 - value: 1 - thread name: pool-2-thread-2
Start: stage - 1 - value: 2 - thread name: pool-2-thread-3
Finish: stage - 1 - value: 0 - thread name: pool-2-thread-1 - time cost: 1623
Start: stage - 2 - value: 0 - thread name: pool-2-thread-1
Finish: stage - 1 - value: 1 - thread name: pool-2-thread-2 - time cost: 1921
Start: stage - 2 - value: 1 - thread name: pool-2-thread-2
Finish: stage - 1 - value: 2 - thread name: pool-2-thread-3 - time cost: 1932
Start: stage - 2 - value: 2 - thread name: pool-2-thread-3
Finish: stage - 2 - value: 0 - thread name: pool-2-thread-1 - time cost: 950
0
Finish: stage - 2 - value: 2 - thread name: pool-2-thread-3 - time cost: 678
2
Finish: stage - 2 - value: 1 - thread name: pool-2-thread-2 - time cost: 956
1

Что-то не так в демо? Пожалуйста, поправьте меня или, если есть какие-то полезные ресурсы, поделитесь пожалуйста.

0 ответов

Другие вопросы по тегам