Как я могу проверить, все ли задачи были выполнены (нормально или внезапно)?

У меня есть следующий класс:

public class Service{
    private static final ExecutorService executor = Executors.newFixedThreadPool(4);

    public synchronized void execute(Collection<Runnable> runs){
        for(Runnable r: runs){
            executor.execute(r);
        }
    }

    public boolean isComplete(){
        //should return true iff all tasks applied by the execute(Collection<Runnable> runs) 
        //method're finished their execution (normally or abruptly, 
        //it doesn matter)
    }
}

Как я могу реализовать isComplete() метод. Мне нужно проверить, есть ли задача, которая в данный момент выполняется. Если исполнитель очищен (все задачи выполнены), метод должен вернуть true, в противном случае вернуть false.

2 ответа

Решение

Учитывая, что вы используете ExecutorService вы могли бы использовать submit вместо execute и хранить возвращенные Future в списке. Тогда внутри isComplete() перебрать все Future и позвоните isDone() на каждом. (Этот подход также позволит вам отменить отправленные задачи, если это необходимо, через Future с).

Например

class Service{
    private static final ExecutorService executor = Executors.newFixedThreadPool(4);
    private List<Future<?>> futures;
    public void execute(Collection<Runnable> runs){
        futures = runs.stream()
                .map(r -> executor.submit(r))
                .collect(Collectors.toList());
    }

    public boolean isComplete(){
        for (Future<?> future : futures) 
            if (!future.isDone()) return false;

        return true;
    }
}

В зависимости от варианта использования вы можете повысить производительность, удалив элементы из futures список, но вы (возможно) должны синхронизировать isComplete метод:

    public synchronized boolean isComplete(){
        Iterator<Future<?>> itr = futures.iterator();
        while (itr.hasNext()) {
            if (itr.next().isDone()) itr.remove();
            else return false;
        }
        return true;
    }

Поскольку этот пример кода написан, предполагается, что вы сделаете только один вызов execute за каждый случай Service так что не надо synchronized , Если у вас будет несколько одновременных абонентов execute на каждой Service Например, обратите внимание, что это заменит futures список при каждом вызове execute , Вы можете справиться с этим, сделав класс одноразовым или добавив к фьючерсам. Это полностью зависит от вашего варианта использования.

Вы можете позвонить shutdown() метод, чтобы попросить исполнителя прекратить все потоки и закрыть пул. А потом позвони isTerminated() который вернет true, если все потоки завершены.

Если вы хотите заблокировать выполнение, вы можете использовать awaitTerminationMethod(), и наконец shutDownNow() будет завершать пул независимо от того, завершено ли выполнение потоков или нет.

CountDownLatch решил мою проблему.

ExecutorService WORKER_THREAD_POOL 
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER_THREAD_POOL.submit(() -> {
        try {
            // ...
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}
 
// wait for the latch to be decremented by the two remaining threads
latch.await();
Другие вопросы по тегам