CompletableFuture: несколько задач

Как я могу асинхронно выполнить 20 выполняемых задач (или 1 задачу 20 раз), используя 5 CompletableFutures?

Вот что у меня есть:

Runnable task = () -> {
        long startTime = System.currentTimeMillis();
        Random random = new Random();

        while (System.currentTimeMillis() - startTime < 3000) {
            DoubleStream.generate(() -> random.nextDouble())
                    .limit(random.nextInt(100))
                    .map(n -> Math.cos(n))
                    .sum();
        }
        System.out.println("Done");
    };

    for (int i = 0; i < 4; i++) {
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(task);
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(task);
        CompletableFuture<Void> future3 = CompletableFuture.runAsync(task);
        CompletableFuture<Void> future4 = CompletableFuture.runAsync(task);
        CompletableFuture<Void> future5 = CompletableFuture.runAsync(task);
        future1.get();
        future2.get();
        future3.get();
        future4.get();
        future5.get();
    }

Если я выполню этот код, то увижу, что он выполняет только 3 future.get() асинхронно: 3, а затем 2, оставленные в течение 1 для итерации ()

Итак, я бы хотел выполнить все 20 заданий максимально асинхронно

3 ответа

Вы можете использовать allOf для одновременного запуска нескольких задач. Сначала я создал комбинацию из 5 задач (так же, как в вашем вопросе), но затем я добавил 10 (и только дважды выполнил) и получил половину времени выполнения.

for (int i = 0; i < 2; i++) {
   CompletableFuture<Void> future1 = CompletableFuture.runAsync(task);
   CompletableFuture<Void> future2 = CompletableFuture.runAsync(task);
  // and so on until ten  
   CompletableFuture<Void> future10 = CompletableFuture.runAsync(task);

   CompletableFuture<Void> combined = CompletableFuture.allOf(future1, future2, future3, future4, future5, future6, future7, future8, future9, future10);

   combined.get();
}

Исполнитель по умолчанию CompletableFuture это общий пул ForkJoinPool, который имеет целевой параллелизм по умолчанию, соответствующий количеству ядер ЦП минус одно. Таким образом, если у вас четыре ядра, максимум три задания будут выполняться асинхронно. Поскольку вы заставляете ждать завершения каждые 5 заданий, вы получите три параллельных выполнения, за которыми следуют два параллельных выполнения в каждой итерации цикла.

Если вы хотите получить конкретную стратегию выполнения, такую ​​как параллелизм по вашему выбору, лучше всего указать правильно настроенного исполнителя. Затем вы должны позволить исполнителю управлять параллелизмом, а не ждать в цикле.

ExecutorService pool = Executors.newFixedThreadPool(5);

for (int i = 0; i < 20; i++) {
    CompletableFuture.runAsync(task, pool);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS); // wait for the completion of all tasks

Это позволяет выполнять пять параллельных заданий, но позволит каждому из пяти потоков получить новое задание сразу после его завершения, вместо того, чтобы ждать следующей итерации цикла.

Но когда вы говорите

Итак, я бы хотел выполнить все 20 заданий максимально асинхронно

непонятно, почему вы заставляете ждать после планирования пяти заданий вообще. Максимальный параллелизм может быть достигнут через

ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
    CompletableFuture.runAsync(task, pool);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS); // wait for the completion of all tasks

Это может порождать столько потоков, сколько заданий, если только одно задание не будет выполнено до того, как все запланировано, поскольку в этом случае рабочий поток может забрать новое задание.

Но эта логика не требует CompletableFuture совсем. Вы также можете использовать:

ExecutorService pool = Executors.newCachedThreadPool();
// schedule 20 jobs and return when all completed
pool.invokeAll(Collections.nCopies(20, Executors.callable(task)));
pool.shutdown();

Но когда ваша работа не связана с вводом / выводом или каким-либо другим видом ожидания, соотв. освобождая процессор, нет смысла создавать больше потоков, чем ядра процессора. Пул, настроенный на число процессоров, является предпочтительным.

ExecutorService pool = Executors.newWorkStealingPool(
    Runtime.getRuntime().availableProcessors());
// schedule 20 jobs at return when all completed
pool.invokeAll(Collections.nCopies(20, Executors.callable(task)));
pool.shutdown();

В вашем особом случае это, вероятно, работает медленнее, так как ваши задания используют системное время, чтобы казаться работающим быстрее при наличии большего количества потоков, чем ядер, но на самом деле выполняют меньше работы. Но для обычной вычислительной задачи это повысит производительность.

Установите следующее системное свойство равным числу потоков, которые вы хотите использовать в общем пуле соединений вилки:

java.util.concurrent.ForkJoinPool.common.parallelism 

Посмотреть ForkJoinPool

Причина в том, что вы не указываете свой собственный пул соединений форка при построении завершаемых фьючерсов, поэтому он неявно использует

ForkJoinPool.commonPool()

Смотрите CompletableFurure

Другие вопросы по тегам