Ждать пока все потоки не закончат свою работу в Java

Я пишу приложение, которое имеет 5 потоков, которые получают некоторую информацию из Интернета одновременно и заполняют 5 различных полей в буферном классе.
Мне нужно проверить данные буфера и сохранить их в базе данных, когда все потоки закончили свою работу.
Как я могу это сделать (получить уведомление, когда все потоки закончили свою работу)?

17 ответов

Решение

Подход, который я использую, заключается в использовании ExecutorService для управления пулами потоков.

ExecutorService es = Executors.newCachedThreadPool();
for(int i=0;i<5;i++)
    es.execute(new Runnable() { /*  your task */ });
es.shutdown();
boolean finshed = es.awaitTermination(1, TimeUnit.MINUTES);
// all tasks have finished or the time has been reached.

Вы можете join в темы. Блоки объединения, пока поток не завершится.

for (Thread thread : threads) {
    thread.join();
}

Обратите внимание, что join бросает InterruptedException, Вы должны решить, что делать, если это произойдет (например, попытаться отменить другие потоки, чтобы предотвратить ненужную работу).

Посмотрите на различные решения.

  1. join() API был представлен в ранних версиях Java. Начиная с выпуска JDK 1.5 доступны несколько хороших альтернатив для этого параллельного пакета.

  2. ExecutorService # invokeAll()

    Выполняет заданные задачи, возвращая список Фьючерсов с их статусом и результатами, когда все будет завершено.

    Обратитесь к этому связанному вопросу SE для примера кода:

    Как использовать invokeAll(), чтобы все пулы потоков выполняли свою задачу?

  3. CountDownLatch

    Средство синхронизации, которое позволяет одному или нескольким потокам ожидать завершения набора операций, выполняемых в других потоках.

    CountDownLatch инициализируется с заданным количеством. Блоки методов ожидания до тех пор, пока текущий счетчик не достигнет нуля из-за вызовов countDown() метод, после которого все ожидающие потоки освобождаются и любые последующие вызовы ожидают немедленного возврата. Это одноразовое явление - счет не может быть сброшен. Если вам нужна версия, которая сбрасывает счет, рассмотрите возможность использования CyclicBarrier.

    Обратитесь к этому вопросу для использования CountDownLatch

    Как ждать потока, который порождает свой собственный поток?

  4. ForkJoinPool или newWorkStealingPool() в исполнителях

  5. Перебирать все объекты Future, созданные после отправки в ExecutorService

Подождите / заблокируйте основной поток, пока другие потоки не завершат свою работу.

Как @Ravindra babu сказал, что этого можно добиться разными способами, но показав на примерах.

  • java.lang.Thread. join () Начиная с версии 1.0

    public static void joiningThreads() throws InterruptedException {
        Thread t1 = new Thread( new LatchTask(1, null), "T1" );
        Thread t2 = new Thread( new LatchTask(7, null), "T2" );
        Thread t3 = new Thread( new LatchTask(5, null), "T3" );
        Thread t4 = new Thread( new LatchTask(2, null), "T4" );
    
        // Start all the threads
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    
        // Wait till all threads completes
        t1.join();
        t2.join();
        t3.join();
        t4.join();
    }
    
  • java.util.concurrent.CountDownLatch Начиная с версии1.5

    • .countDown() "Уменьшает количество защелок.
    • .await() "Методы ожидания блокируются, пока текущий счетчик не достигнет нуля.

    Если вы создали latchGroupCount = 4 тогда countDown() должен быть вызван 4 раза, чтобы сделать счет 0. Итак, чтобы await() освободит блокирующие потоки.

    public static void latchThreads() throws InterruptedException {
        int latchGroupCount = 4;
        CountDownLatch latch = new CountDownLatch(latchGroupCount);
        Thread t1 = new Thread( new LatchTask(1, latch), "T1" );
        Thread t2 = new Thread( new LatchTask(7, latch), "T2" );
        Thread t3 = new Thread( new LatchTask(5, latch), "T3" );
        Thread t4 = new Thread( new LatchTask(2, latch), "T4" );
    
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    
        //latch.countDown();
    
        latch.await(); // block until latchGroupCount is 0.
    }
    

Пример кода класса Threaded LatchTask. Для проверки подхода используйтеjoiningThreads(); а также latchThreads(); из основного метода.

class LatchTask extends Thread {
    CountDownLatch latch;
    int iterations = 10;
    public LatchTask(int iterations, CountDownLatch latch) {
        this.iterations = iterations;
        this.latch = latch;
    }

    @Override
    public void run() {
        String threadName = Thread.currentThread().getName();
        System.out.println(threadName + " : Started Task...");

        for (int i = 0; i < iterations; i++) {
            System.out.println(threadName + " : " + i);
            MainThread_Wait_TillWorkerThreadsComplete.sleep(1);
        }
        System.out.println(threadName + " : Completed Task");
        // countDown() « Decrements the count of the latch group.
        if(latch != null)
            latch.countDown();
    }
}
  • CyclicBarriers - средство синхронизации, которое позволяет набору потоков ждать друг друга, чтобы достичь общей точки барьера.CyclicBarriers полезны в программах, включающих группу потоков фиксированного размера, которые время от времени должны ждать друг друга. Барьер называется циклическим, потому что его можно повторно использовать после освобождения ожидающих потоков.
    CyclicBarrier barrier = new CyclicBarrier(3);
    barrier.await();
    
    Например, обратитесь к этому классу Concurrent_ParallelNotifyies.

  • Фреймворк Executer: мы можем использовать ExecutorService для создания пула потоков и отслеживать ход выполнения асинхронных задач с помощью Future.

    • submit(Runnable), submit(Callable)которые возвращают Future Object. Используяfuture.get() функцию мы можем заблокировать основной поток, пока рабочие потоки не завершат свою работу.

    • invokeAll(...) - возвращает список объектов Future, с помощью которых вы можете получить результаты выполнения каждого Callable.

Найдите пример использования интерфейсов Runnable, Callable с платформой Executor.


@Смотрите также

Помимо Thread.join() По предложению других, Java 5 представила среду исполнения. Там вы не работаете с Thread объекты. Вместо этого вы отправляете Callable или же Runnable возражает против исполнителя. Есть специальный исполнитель, который предназначен для выполнения нескольких задач и выдачи их результатов не по порядку. Это ExecutorCompletionService:

ExecutorCompletionService executor;
for (..) {
    executor.submit(Executors.callable(yourRunnable));
}

Тогда вы можете повторно позвонить take() пока нет больше Future<?> возвращаемые объекты, что означает, что все они завершены.


Еще одна вещь, которая может быть актуальной, в зависимости от вашего сценария CyclicBarrier,

Средство синхронизации, которое позволяет всем потокам ожидать друг друга, чтобы достичь общей барьерной точки. CyclicBarriers полезны в программах, включающих группу потоков фиксированного размера, которые иногда должны ждать друг друга. Барьер называется циклическим, потому что его можно использовать повторно после освобождения ожидающих потоков.

Другая возможность CountDownLatch объект, который полезен для простых ситуаций: поскольку вы заранее знаете количество потоков, вы инициализируете его соответствующим количеством и передаете ссылку на объект каждому потоку.
По завершении своей задачи каждый поток вызывает CountDownLatch.countDown() который уменьшает внутренний счетчик. Основной поток, после запуска всех остальных, должен сделать CountDownLatch.await() блокировка вызова. Он будет выпущен, как только внутренний счетчик достигнет 0.

Обратите внимание, что с этим объектом InterruptedException может быть также брошено.

Ты сделаешь

for (Thread t : new Thread[] { th1, th2, th3, th4, th5 })
    t.join()

После этого цикла for вы можете быть уверены, что все потоки завершили свою работу.

Сохраните объекты Thread в некоторой коллекции (например, List или Set), затем переберите коллекцию после запуска потоков и вызовите join() в Threads.

Я создал небольшой вспомогательный метод, чтобы дождаться завершения нескольких потоков:

public static void waitForThreadsToFinish(Thread... threads) {
        try {
            for (Thread thread : threads) {
                thread.join();
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

Вы можете использовать метод ThreadF#join для этой цели.

Хотя это не относится к проблеме OP, если вы заинтересованы в синхронизации (точнее, рандеву) только с одним потоком, вы можете использовать обменник

В моем случае мне нужно было приостановить родительский поток, пока дочерний поток не сделал что-то, например, завершил свою инициализацию. CountDownLatch также хорошо работает.

У меня была похожая проблема, и в итоге я использовал Java 8 parallelStream.

requestList.parallelStream().forEach(req -> makeRequest(req));

Это супер просто и читабельно. За кулисами он использует пул форк-соединений JVM по умолчанию, что означает, что он будет ждать завершения всех потоков, прежде чем продолжить. Для моего случая это было аккуратное решение, потому что это был единственный параллельный поток в моем приложении. Если у вас одновременно работает несколько параллельных потоков, прочитайте ссылку ниже.

Подробнее о параллельных потоках здесь.

Попробуй это, сработает.

  Thread[] threads = new Thread[10];

  List<Thread> allThreads = new ArrayList<Thread>();

  for(Thread thread : threads){

        if(null != thread){

              if(thread.isAlive()){

                    allThreads.add(thread);

              }

        }

  }

  while(!allThreads.isEmpty()){

        Iterator<Thread> ite = allThreads.iterator();

        while(ite.hasNext()){

              Thread thread = ite.next();

              if(!thread.isAlive()){

                   ite.remove();
              }

        }

   }

Служба исполнителя может использоваться для управления несколькими потоками, включая состояние и завершение. Смотрите http://programmingexamples.wikidot.com/executorservice

Существующие ответы могут join() каждая нить.

Но есть несколько способов получить массив / список потоков:

  • Добавьте тему в список при создании.
  • использование ThreadGroup управлять потоками.

Следующий код будет использовать ThreadGruop подход. Сначала создается группа, затем при создании каждого потока указывается группа в конструкторе, позже можно получить массив потоков через ThreadGroup.enumerate()


Код

SyncBlockLearn.java

import org.testng.Assert;
import org.testng.annotations.Test;

/**
 * synchronized block - learn,
 *
 * @author eric
 * @date Apr 20, 2015 1:37:11 PM
 */
public class SyncBlockLearn {
    private static final int TD_COUNT = 5; // thread count
    private static final int ROUND_PER_THREAD = 100; // round for each thread,
    private static final long INC_DELAY = 10; // delay of each increase,

    // sync block test,
    @Test
    public void syncBlockTest() throws InterruptedException {
        Counter ct = new Counter();
        ThreadGroup tg = new ThreadGroup("runner");

        for (int i = 0; i < TD_COUNT; i++) {
            new Thread(tg, ct, "t-" + i).start();
        }

        Thread[] tArr = new Thread[TD_COUNT];
        tg.enumerate(tArr); // get threads,

        // wait all runner to finish,
        for (Thread t : tArr) {
            t.join();
        }

        System.out.printf("\nfinal count: %d\n", ct.getCount());
        Assert.assertEquals(ct.getCount(), TD_COUNT * ROUND_PER_THREAD);
    }

    static class Counter implements Runnable {
        private final Object lkOn = new Object(); // the object to lock on,
        private int count = 0;

        @Override
        public void run() {
            System.out.printf("[%s] begin\n", Thread.currentThread().getName());

            for (int i = 0; i < ROUND_PER_THREAD; i++) {
                synchronized (lkOn) {
                    System.out.printf("[%s] [%d] inc to: %d\n", Thread.currentThread().getName(), i, ++count);
                }
                try {
                    Thread.sleep(INC_DELAY); // wait a while,
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.printf("[%s] end\n", Thread.currentThread().getName());
        }

        public int getCount() {
            return count;
        }
    }
}

Основной поток будет ожидать завершения всех потоков в группе.

У меня была аналогичная ситуация, когда мне приходилось ждать, пока все дочерние потоки завершат свое выполнение, тогда только я мог получить результат состояния для каждого из них. Следовательно, мне нужно было дождаться завершения всего дочернего потока.

ниже мой код, где я использовал многопоточность, используя

       public static void main(String[] args) {
        List<RunnerPojo> testList = ExcelObject.getTestStepsList();//.parallelStream().collect(Collectors.toList());
        int threadCount = ConfigFileReader.getInstance().readConfig().getParallelThreadCount();
        System.out.println("Thread count is : =========  " + threadCount); // 5

        ExecutorService threadExecutor = new DriverScript().threadExecutor(testList, threadCount);


        boolean isProcessCompleted = waitUntilCondition(() -> threadExecutor.isTerminated()); // Here i used waitUntil condition 

        if (isProcessCompleted) {
            testList.forEach(x -> {
                System.out.println("Test Name: " + x.getTestCaseId());
                System.out.println("Test Status : " + x.getStatus());
                System.out.println("======= Test Steps ===== ");

                x.getTestStepsList().forEach(y -> {
                    System.out.println("Step Name: " + y.getDescription());
                    System.out.println("Test caseId : " + y.getTestCaseId());
                    System.out.println("Step Status: " + y.getResult());
                    System.out.println("\n ============ ==========");
                });
            });
        }

Ниже приведен метод распределения списка с параллельной обработкой.

      // This method will split my list and run in a parallel process with mutliple threads
    private ExecutorService threadExecutor(List<RunnerPojo> testList, int threadSize) {
        ExecutorService exec = Executors.newFixedThreadPool(threadSize);
        testList.forEach(tests -> {
            exec.submit(() -> {
                driverScript(tests);
            });
        });
        exec.shutdown();
        return exec;
    }

Это мой метод ожидания: здесь вы можете подождать, пока ваше условие не выполнится в цикле do while. в моем случае я ждал максимального тайм-аута. это будет продолжать проверять, пока ваш threadExecutor.isTerminated()является trueс периодом опроса 5 сек.

          static boolean waitUntilCondition(Supplier<Boolean> function) {
        Double timer = 0.0;
        Double maxTimeOut = 20.0;

        boolean isFound;
        do {
            isFound = function.get();
            if (isFound) {
                break;
            } else {
                try {
                    Thread.sleep(5000); // Sleeping for 5 sec (main thread will sleep for 5 sec)
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                timer++;
                System.out.println("Waiting for condition to be true .. waited .." + timer * 5 + " sec.");
            }
        } while (timer < maxTimeOut + 1.0);

        return isFound;
    }

Используйте это в вашем основном потоке: while(! Executor.isTeridity ()); Поместите эту строку кода после запуска всех потоков из службы executor. Это запустит основной поток только после завершения всех потоков, запущенных исполнителями. Обязательно вызовите executor.shutdown(); перед вышеупомянутым циклом.

Другие вопросы по тегам