Как ждать завершения всех потоков, используя ExecutorService?

Мне нужно выполнить некоторое количество задач 4 за один раз, что-то вроде этого:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

Как я могу получить уведомление, когда все они будут завершены? Пока я не могу думать о чем-то лучше, чем установить какой-либо глобальный счетчик задач и уменьшить его в конце каждой задачи, а затем отслеживать в бесконечном цикле этот счетчик, чтобы он стал 0; или получить список фьючерсов и в бесконечном цикле монитора isDone для всех из них. Каковы лучшие решения без бесконечных циклов?

Благодарю.

27 ответов

Решение

В основном на ExecutorService ты звонишь shutdown() а потом awaitTermination():

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}

Используйте CountDownLatch:

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

и в рамках вашей задачи (приложите в try / finally)

latch.countDown();

ExecutorService.invokeAll() делает это для вас.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);

Вы также можете использовать списки фьючерсов:

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

затем, когда вы хотите присоединиться ко всем из них, это, по сути, эквивалентно объединению каждого из них (с дополнительным преимуществом в том, что оно вновь вызывает исключения из дочерних потоков в основной):

for(Future f: this.futures) { f.get(); }

По сути, уловка заключается в том, чтобы вызывать.get() для каждого Future по одному, вместо бесконечного зацикливания, вызывая isDone() on (все или каждый). Таким образом, вы гарантированно "продвинетесь" через и через этот блок, как только закончится последний поток. Предостережение заключается в том, что, поскольку вызов.get() повторно вызывает исключения, если один из потоков умирает, вы могли бы подняться из этого, возможно, до того, как другие потоки закончат работу до завершения [чтобы избежать этого, вы можете добавить catch ExecutionException вокруг получения вызова. Другое предостережение заключается в том, что он сохраняет ссылку на все потоки, поэтому, если у них есть локальные переменные потока, они не будут собираться до тех пор, пока вы не пройдете этот блок (хотя, возможно, вы сможете обойти это, если это станет проблемой, удалив Будущее вне ArrayList). Если вы хотите знать, какое будущее "заканчивается первым", вы можете использовать что-то вроде /questions/13622051/ozhidanie-zaversheniya-neskolkih-potokov-v-java/13622066#13622066

В Java8 вы можете сделать это с CompletableFuture:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

Просто мои два цента. Чтобы преодолеть требование CountDownLatch Чтобы заранее узнать количество задач, вы можете сделать это по старинке, используя простую Semaphore,

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...

В вашей задаче просто позвоните s.release() как бы вы latch.countDown();

Немного опоздал к игре, но ради завершения...

Вместо того, чтобы "ждать" выполнения всех заданий, вы можете думать по принципу Голливуда "не звони мне, я позвоню тебе" - когда я закончу. Я думаю, что результирующий код более элегантный...

Гуава предлагает несколько интересных инструментов для достижения этой цели.

Пример::

Обернуть ExecutorService в ListeningExecutorService::

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

Отправить коллекцию вызываемых для исполнения::

for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});

Теперь основная часть:

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

Прикрепите обратный вызов к ListenableFuture, который вы можете использовать, чтобы получать уведомления, когда все фьючерсы завершены:

        Futures.addCallback(lf, new FutureCallback<List<Integer>>() {
        @Override
        public void onSuccess(List<Integer> result) {
            log.info("@@ finished processing {} elements", Iterables.size(result));
            // do something with all the results
        }

        @Override
        public void onFailure(Throwable t) {
            log.info("@@ failed because of :: {}", t);
        }
    });

Это также дает то преимущество, что вы можете собрать все результаты в одном месте после завершения обработки...

Больше информации здесь

Класс CyclicBarrier в Java 5 и более поздних версиях предназначен для такого рода вещей.

Здесь есть два варианта, просто запутайте, какой из них лучше всего выбрать.

Опция 1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

Вариант 2:

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
    futures.add(es.submit(task));
}

for(Future<?> future : futures) {
    try {
        future.get();
    }catch(Exception e){
        // do logging and nothing else
    }
}
es.shutdown();

Здесь положить future.get(); в попытке поймать это хорошая идея, верно?

Следуйте одному из следующих подходов.

  1. Перебирать все будущие задачи, возвращенные из submit на ExecutorService и проверить статус с блокировкой вызова get() на Future объект в соответствии с предложением Kiran
  2. использование invokeAll() на ExecutorService
  3. CountDownLatch
  4. ForkJoinPool или Executors.html#newWorkStealingPool
  5. использование shutdown, awaitTermination, shutdownNow API ThreadPoolExecutor в правильной последовательности

Связанные вопросы SE:

Как CountDownLatch используется в многопоточности Java?

Как правильно отключить Java ExecutorService

Вы можете обернуть ваши задачи в другой runnable, который будет отправлять уведомления:

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});

Чистый способ с ExecutorService

 List<Future<Void>> results = null;
 try {
     List<Callable<Void>> tasks = new ArrayList<>();
     ExecutorService executorService = Executors.newFixedThreadPool(4);
     results = executorService.invokeAll(tasks);
 } catch (InterruptedException ex) {
     ...
 } catch (Exception ex) {
     ...
 }

Это моё решение, основанное на подсказке AdamSkywalker, и оно работает

package frss.main;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestHilos {

    void procesar() {
        ExecutorService es = Executors.newFixedThreadPool(4);
        List<Runnable> tasks = getTasks();
        CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).join();
        es.shutdown();

        System.out.println("FIN DEL PROCESO DE HILOS");
    }

    private List<Runnable> getTasks() {
        List<Runnable> tasks = new ArrayList<Runnable>();

        Hilo01 task1 = new Hilo01();
        tasks.add(task1);

        Hilo02 task2 = new Hilo02();
        tasks.add(task2);
        return tasks;
    }

    private class Hilo01 extends Thread {

        @Override
        public void run() {
            System.out.println("HILO 1");
        }

    }

    private class Hilo02 extends Thread {

        @Override
        public void run() {
            try {
                sleep(2000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("HILO 2");
        }

    }


    public static void main(String[] args) {
        TestHilos test = new TestHilos();
        test.procesar();
    }
}

Просто, чтобы предоставить больше альтернатив здесь, чтобы использовать защелки / барьеры. Вы также можете получить частичные результаты, пока все они не закончат использовать CompletionService.

Из Java Concurrency на практике: "Если у вас есть пакет вычислений для отправки Исполнителю, и вы хотите получать их результаты по мере их появления, вы можете сохранить Future, связанное с каждой задачей, и многократно запрашивать завершение, вызывая get с Тайм-аут нулевой. Это возможно, но утомительно. К счастью, есть лучший способ: услуга завершения."

Здесь реализация

public class TaskSubmiter {
    private final ExecutorService executor;
    TaskSubmiter(ExecutorService executor) { this.executor = executor; }
    void doSomethingLarge(AnySourceClass source) {
        final List<InterestedResult> info = doPartialAsyncProcess(source);
        CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
        for (final InterestedResult interestedResultItem : info)
            completionService.submit(new Callable<PartialResult>() {
                public PartialResult call() {
                    return InterestedResult.doAnOperationToGetPartialResult();
                }
        });

    try {
        for (int t = 0, n = info.size(); t < n; t++) {
            Future<PartialResult> f = completionService.take();
            PartialResult PartialResult = f.get();
            processThisSegment(PartialResult);
            }
        } 
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } 
        catch (ExecutionException e) {
            throw somethinghrowable(e.getCause());
        }
    }
}

Я только что написал пример программы, которая решает вашу проблему. Краткой реализации не дано, поэтому я добавлю ее. Пока вы можете использовать executor.shutdown() а также executor.awaitTermination()Это не лучшая практика, поскольку время, затрачиваемое разными потоками, будет непредсказуемым.

ExecutorService es = Executors.newCachedThreadPool();
    List<Callable<Integer>> tasks = new ArrayList<>();

    for (int j = 1; j <= 10; j++) {
        tasks.add(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                System.out.println("Starting Thread "
                        + Thread.currentThread().getId());

                for (int i = 0; i < 1000000; i++) {
                    sum += i;
                }

                System.out.println("Stopping Thread "
                        + Thread.currentThread().getId());
                return sum;
            }

        });
    }

    try {
        List<Future<Integer>> futures = es.invokeAll(tasks);
        int flag = 0;

        for (Future<Integer> f : futures) {
            Integer res = f.get();
            System.out.println("Sum: " + res);
            if (!f.isDone()) 
                flag = 1;
        }

        if (flag == 0)
            System.out.println("SUCCESS");
        else
            System.out.println("FAILED");

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

Синтаксис Try-with-Resources для службы исполнителя с Project Loom

Project Loom стремится добавить новые функции к возможностям параллелизма в Java.

Одна из этих функций - сделать . Это означает, что каждая реализация будет предлагатьметод. А это значит, что мы можем использовать синтаксис try-with-resources для автоматического закрытия ExecutorService объект.

В ExecutorService#closeблокируется до тех пор, пока все отправленные задачи не будут выполнены. С использованием close занимает место вызова shutdown & awaitTermination.

Существование AutoCloseableвносит свой вклад в попытку Project Loom привнести в Java «структурированный параллелизм» .

      try (
    ExecutorService executorService = Executors.… ;
) {
    // Submit your `Runnable`/`Callable` tasks to the executor service.
    …
}
// At this point, flow-of-control blocks until all submitted tasks are done/canceled/failed.
// After this point, the executor service will have been automatically shutdown, wia `close` method called by try-with-resources syntax.

Для получения дополнительной информации о Project Loom поищите разговоры и интервью, данные Роном Пресслером и другими членами команды Project Loom. Сосредоточьтесь на более свежем, поскольку Project Loom эволюционировал.

Сейчас доступны экспериментальные сборки технологии Project Loom , основанные на раннем доступе Java 17 .

Вы можете использовать этот код:

public class MyTask implements Runnable {

    private CountDownLatch countDownLatch;

    public MyTask(CountDownLatch countDownLatch {
         this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
         try {
             //Do somethings
             //
             this.countDownLatch.countDown();//important
         } catch (InterruptedException ex) {
              Thread.currentThread().interrupt();
         }
     }
}

CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
     taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");

Поэтому я публикую свой ответ по связанному вопросу здесь, если кто-то хочет более простой способ сделать это

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
    futures[i++] =  CompletableFuture.runAsync(runner, executor);
}

CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.

Я создал следующий рабочий пример. Идея состоит в том, чтобы иметь способ обработать пул задач (в качестве примера я использую очередь) со многими потоками (определяемыми программно по количеству заданий / пороговому значению) и ожидать завершения всех потоков, чтобы продолжить какую-то другую обработку.

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** Testing CountDownLatch and ExecutorService to manage scenario where
 * multiple Threads work together to complete tasks from a single
 * resource provider, so the processing can be faster. */
public class ThreadCountDown {

private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();

public static void main(String[] args) {
    // Create a queue with "Tasks"
    int numberOfTasks = 2000;
    while(numberOfTasks-- > 0) {
        tasks.add(numberOfTasks);
    }

    // Initiate Processing of Tasks
    ThreadCountDown main = new ThreadCountDown();
    main.process(tasks);
}

/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
    int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
    threadsCountdown = new CountDownLatch(numberOfThreads);
    ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);

    //Initialize each Thread
    while(numberOfThreads-- > 0) {
        System.out.println("Initializing Thread: "+numberOfThreads);
        threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
    }

    try {
        //Shutdown the Executor, so it cannot receive more Threads.
        threadExecutor.shutdown();
        threadsCountdown.await();
        System.out.println("ALL THREADS COMPLETED!");
        //continue With Some Other Process Here
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
    int threshold = 100;
    int threads = size / threshold;
    if( size > (threads*threshold) ){
        threads++;
    }
    return threads;
}

/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
    return tasks.poll();
}

/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {

    private String threadName;

    protected MyThread(String threadName) {
        super();
        this.threadName = threadName;
    }

    @Override
    public void run() {
        Integer task;
        try{
            //Check in the Task pool if anything pending to process
            while( (task = getTask()) != null ){
                processTask(task);
            }
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            /*Reduce count when no more tasks to process. Eventually all
            Threads will end-up here, reducing the count to 0, allowing
            the flow to continue after threadsCountdown.await(); */
            threadsCountdown.countDown();
        }
    }

    private void processTask(Integer task){
        try{
            System.out.println(this.threadName+" is Working on Task: "+ task);
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
}
}

Надеюсь, поможет!

Если вы используете больше потоков ExecutionServices ПОСЛЕДОВАТЕЛЬНО и хотите дождаться завершения КАЖДОЙ EXECUTIONSERVICE. Лучший способ описан ниже;

ExecutorService executer1 = Executors.newFixedThreadPool(THREAD_SIZE1);
for (<loop>) {
   executer1.execute(new Runnable() {
            @Override
            public void run() {
                ...
            }
        });
} 
executer1.shutdown();

try{
   executer1.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

   ExecutorService executer2 = Executors.newFixedThreadPool(THREAD_SIZE2);
   for (true) {
      executer2.execute(new Runnable() {
            @Override
            public void run() {
                 ...
            }
        });
   } 
   executer2.shutdown();
} catch (Exception e){
 ...
}

Ты должен использовать executorService.shutdown() а также executorService.awaitTermination метод.

Пример следующим образом:

public class ScheduledThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
                0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
    }

}

Вы можете использовать свой собственный подкласс ExecutorCompletionService для переноса taskExecutorи ваша собственная реализация BlockingQueue для получения информации о завершении каждой задачи и выполнения любого обратного вызова или другого действия, которое вы пожелаете, когда количество выполненных задач достигает желаемой цели.

Java 8 - Мы можем использовать потоковый API для обработки потока. Пожалуйста, посмотрите фрагмент ниже

final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool

//alternatively to specify parallelism 
new ForkJoinPool(15).submit(
          () -> tasks.stream().parallel().forEach(Runnable::run) 
    ).get();

ExecutorService WORKER_THREAD_POOL 
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER_THREAD_POOL.submit(() -> {
        try {
            // doSomething();
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

// wait for the latch to be decremented by the two remaining threads
latch.await();

Если doSomething() бросить некоторые другие исключения, latch.countDown() Кажется, не будет выполнять, так что мне делать?

Вы можете вызвать waitTillDone() в этом классе Runner:

Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool

while(...) {
    runner.run(new MyTask()); // here you submit your task
}


runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)


runner.shutdown(); // once you done you can shutdown the runner

Вы можете повторно использовать этот класс и вызывать waitTillDone() столько раз, сколько хотите, прежде чем вызывать shutdown(), плюс ваш код очень прост. Также вам не нужно знать количество заданий заранее.

Чтобы использовать это просто добавьте этот Gradle / Maven compile 'com.github.matejtymes:javafixes:1.1.1' зависимость от вашего проекта.

Более подробную информацию можно найти здесь:

https://github.com/MatejTymes/JavaFixes

http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

Это может помочь

Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
                try {
                    Log.i(LOG_TAG, "Waiting for executor to terminate...");
                    if (executor.isTerminated())
                        break;
                    if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                        break;
                    }
                } catch (InterruptedException ignored) {}
            }

Есть метод в executor getActiveCount() - это дает количество активных потоков.

После охвата потока мы можем проверить, activeCount() значение 0, Если значение равно нулю, это означает, что в данный момент нет активных потоков, что означает, что задача завершена:

while (true) {
    if (executor.getActiveCount() == 0) {
    //ur own piece of code
    break;
    }
}
Другие вопросы по тегам