Как CountDownLatch используется в многопоточности Java?

Может кто-нибудь помочь мне понять, что такое Java CountDownLatch есть и когда его использовать?

У меня нет четкого представления о том, как работает эта программа. Как я понимаю, все три потока запускаются одновременно, и каждый поток вызовет CountDownLatch через 3000 мс. Так что обратный отсчет будет уменьшаться один за другим. После того, как защелка станет нулевой, программа напечатает "Завершено". Может быть, то, как я понял, неверно.

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Processor implements Runnable {
    private CountDownLatch latch;

    public Processor(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        System.out.println("Started.");

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        latch.countDown();
    }
}

// ------------------------------------------------ -----

public class App {

    public static void main(String[] args) {

        CountDownLatch latch = new CountDownLatch(3); // coundown from 3 to 0

        ExecutorService executor = Executors.newFixedThreadPool(3); // 3 Threads in pool

        for(int i=0; i < 3; i++) {
            executor.submit(new Processor(latch)); // ref to latch. each time call new Processes latch will count down by 1
        }

        try {
            latch.await();  // wait until latch counted down to 0
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Completed.");
    }

}

13 ответов

Решение

Да, вы правильно поняли. CountDownLatch работает по принципу защелки, основной поток будет ждать, пока ворота не откроются. Один поток ожидает n потоков, указанных при создании CountDownLatch,

Любой поток, обычно основной поток приложения, который вызывает CountDownLatch.await() будет ждать, пока счетчик не достигнет нуля или не прервется другим потоком. Все остальные потоки обязаны вести обратный отсчет путем вызова CountDownLatch.countDown() как только они завершены или готовы.

Как только счетчик достигает нуля, ожидающий поток продолжается. Один из недостатков / преимуществ CountDownLatch является то, что он не может быть повторно использован: как только счет достигает нуля, вы не можете использовать CountDownLatch больше

Редактировать:

использование CountDownLatch когда одному потоку (например, основному потоку) требуется дождаться завершения одного или нескольких потоков, прежде чем он сможет продолжить обработку.

Классический пример использования CountDownLatch в Java - это базовое Java-приложение на стороне сервера, которое использует архитектуру служб, где несколько служб предоставляются несколькими потоками, и приложение не может начать обработку, пока все службы не будут запущены успешно.

Вопрос PS OP имеет довольно простой пример, поэтому я его не включил.

CountDownLatch в Java это тип синхронизатора, который позволяет Thread ждать одного или нескольких Threadдо начала обработки.

CountDownLatch работает по принципу защелки, поток будет ждать, пока ворота не откроются. Одна нить ждет n количество потоков, указанное при создании CountDownLatch,

например final CountDownLatch latch = new CountDownLatch(3);

Здесь мы устанавливаем счетчик на 3.

Любой поток, обычно основной поток приложения, который вызывает CountDownLatch.await() будет ждать, пока счетчик не достигнет нуля или не прервется другим Thread, Все остальные потоки обязаны делать обратный отсчет, вызывая CountDownLatch.countDown() как только они завершены или готовы к работе. как только счет достигает нуля, Thread ожидание начинает работать.

Здесь счет уменьшается на CountDownLatch.countDown() метод.

Thread который вызывает await() Метод будет ждать, пока начальный счет не достигнет нуля.

Чтобы счетчик равнялся нулю, другим потокам нужно вызвать countDown() метод. Как только счет становится нулем, поток, который вызвал await() метод возобновится (начнется его выполнение).

Недостаток CountDownLatch в том, что он не может быть использован повторно: как только счет станет нулевым, он больше не будет использоваться.

Он используется, когда мы хотим подождать, пока несколько потоков выполнят свою задачу. Это похоже на присоединение к темам.

Где мы можем использовать CountDownLatch

Рассмотрим сценарий, в котором у нас есть требование, когда у нас есть три потока "A", "B" и "C", и мы хотим запустить поток "C" только тогда, когда потоки "A" и "B" завершают или частично завершают свою задачу.

Это может быть применено к реальному сценарию ИТ

Рассмотрим сценарий, в котором менеджер разделяет модули между группами разработчиков (A и B) и хочет назначить их команде QA для тестирования только тогда, когда обе команды выполнят свою задачу.

public class Manager {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        MyDevTeam teamDevA = new MyDevTeam(countDownLatch, "devA");
        MyDevTeam teamDevB = new MyDevTeam(countDownLatch, "devB");
        teamDevA.start();
        teamDevB.start();
        countDownLatch.await();
        MyQATeam qa = new MyQATeam();
        qa.start();
    }   
}

class MyDevTeam extends Thread {   
    CountDownLatch countDownLatch;
    public MyDevTeam (CountDownLatch countDownLatch, String name) {
        super(name);
        this.countDownLatch = countDownLatch;       
    }   
    @Override
    public void run() {
        System.out.println("Task assigned to development team " + Thread.currentThread().getName());
        try {
                Thread.sleep(2000);
        } catch (InterruptedException ex) {
                ex.printStackTrace();
        }
    System.out.println("Task finished by development team Thread.currentThread().getName());
            this.countDownLatch.countDown();
    }
}

class MyQATeam extends Thread {   
    @Override
    public void run() {
        System.out.println("Task assigned to QA team");
        try {
                Thread.sleep(2000);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
        System.out.println("Task finished by QA team");
    }
}

Вывод вышеуказанного кода будет:

Задача, поставленная перед командой разработчиков devB

Задача, поставленная перед командой разработчиков devA

Задача выполнена командой разработчиков devB

Задача выполнена командой разработчиков devA

Задача, назначенная команде QA

Задача выполнена командой QA

Здесь метод await() ожидает, пока флаг countdownlatch станет равным 0, а метод countDown() уменьшит флаг countdownlatch на 1.

Ограничение JOIN: Приведенный выше пример также может быть достигнут с помощью JOIN, но JOIN нельзя использовать в двух сценариях:

  1. Когда мы используем ExecutorService вместо класса Thread для создания потоков.
  2. Измените приведенный выше пример, где Manager хочет передать код команде QA, как только Development выполнит свою задачу на 80%. Это означает, что CountDownLatch позволяет нам изменять реализацию, которую можно использовать для ожидания другого потока для их частичного выполнения.

NikolaB объяснил это очень хорошо, однако пример был бы полезен для понимания, так что вот один простой пример...

 import java.util.concurrent.*;


  public class CountDownLatchExample {

  public static class ProcessThread implements Runnable {

    CountDownLatch latch;
    long workDuration;
    String name;

    public ProcessThread(String name, CountDownLatch latch, long duration){
        this.name= name;
        this.latch = latch;
        this.workDuration = duration;
    }


    public void run() {
        try {
            System.out.println(name +" Processing Something for "+ workDuration/1000 + " Seconds");
            Thread.sleep(workDuration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(name+ "completed its works");
        //when task finished.. count down the latch count...

        // basically this is same as calling lock object notify(), and object here is latch
        latch.countDown();
    }
}


public static void main(String[] args) {
    // Parent thread creating a latch object
    CountDownLatch latch = new CountDownLatch(3);

    new Thread(new ProcessThread("Worker1",latch, 2000)).start(); // time in millis.. 2 secs
    new Thread(new ProcessThread("Worker2",latch, 6000)).start();//6 secs
    new Thread(new ProcessThread("Worker3",latch, 4000)).start();//4 secs


    System.out.println("waiting for Children processes to complete....");
    try {
        //current thread will get notified if all chidren's are done 
        // and thread will resume from wait() mode.
        latch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("All Process Completed....");

    System.out.println("Parent Thread Resuming work....");



     }
  }

CoundDownLatch позволяет вам заставить поток ждать, пока все остальные потоки не завершат свое выполнение.

Псевдокод может быть:

// Main thread starts
// Create CountDownLatch for N threads
// Create and start N threads
// Main thread waits on latch
// N threads completes there tasks are returns
// Main thread resume execution

Этот пример из Java Doc помог мне понять концепции:

class Driver { // ...
  void main() throws InterruptedException {
    CountDownLatch startSignal = new CountDownLatch(1);
    CountDownLatch doneSignal = new CountDownLatch(N);

    for (int i = 0; i < N; ++i) // create and start threads
      new Thread(new Worker(startSignal, doneSignal)).start();

    doSomethingElse();            // don't let run yet
    startSignal.countDown();      // let all threads proceed
    doSomethingElse();
    doneSignal.await();           // wait for all to finish
  }
}

class Worker implements Runnable {
  private final CountDownLatch startSignal;
  private final CountDownLatch doneSignal;
  Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
     this.startSignal = startSignal;
     this.doneSignal = doneSignal;
  }
  public void run() {
     try {
       startSignal.await();
       doWork();
       doneSignal.countDown();
     } catch (InterruptedException ex) {} // return;
  }

  void doWork() { ... }
}

Схематически говоря:

Очевидно, CountDownLatch позволяет один поток (здесь Driver) ждать, пока куча запущенных потоков (здесь Worker) сделано с их исполнением.

Как упоминалось в JavaDoc ( https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html), CountDownLatch - это средство синхронизации, представленное в Java 5. Здесь синхронизация не означает ограничение доступа к критическому разделу. Но скорее последовательность действий разных потоков. Тип синхронизации, достигнутый с помощью CountDownLatch, аналогичен типу Join. Предположим, что существует поток "M", который должен ждать, пока другие рабочие потоки "T1", "T2", "T3" завершат свои задачи. До Java 1.5 это можно было сделать следующим образом: M выполняет следующий код

    T1.join();
    T2.join();
    T3.join();

Приведенный выше код гарантирует, что поток M возобновит свою работу после того, как T1, T2, T3 завершат свою работу. T1, T2, T3 могут завершить свою работу в любом порядке. То же самое может быть достигнуто с помощью CountDownLatch, где T1, T2, T3 и поток M совместно используют один и тот же объект CountDownLatch.
"М" просит: countDownLatch.await();
где, как "Т1", "Т2", "Т3" делает countDownLatch.countdown();

Недостатком метода соединения является то, что M должен знать о T1, T2, T3. Если позже будет добавлен новый рабочий поток T4, то M тоже должен об этом знать. Этого можно избежать с помощью CountDownLatch. После реализации последовательность действий будет [T1,T2,T3](порядок T1, T2, T3 может быть в любом случае) -> [M]

Из документации оракула о CountDownLatch:

Средство синхронизации, которое позволяет одному или нескольким потокам ожидать завершения набора операций, выполняемых в других потоках.

CountDownLatch инициализируется с заданным количеством. await методы блокируются, пока текущий счетчик не достигнет нуля из-за вызовов countDown() метод, после которого все ожидающие потоки освобождаются и любые последующие вызовы ожидают немедленного возврата. Это одноразовое явление - счет не может быть сброшен.

CountDownLatch - это универсальный инструмент синхронизации, который можно использовать для различных целей.

CountDownLatch инициализированный с счетчиком один служит простой защелкой включения / выключения, или воротами: все вызывающие потоки ожидают в воротах, пока они не будут открыты потоком, вызывающим countDown().

CountDownLatch инициализированный N может использоваться, чтобы заставить один поток ждать, пока N потоков не выполнили какое-либо действие, или некоторое действие не было выполнено N раз.

public void await()
           throws InterruptedException

Заставляет текущий поток ждать, пока защелка не обратится к нулю, если поток не прерывается.

Если текущий счетчик равен нулю, этот метод возвращается немедленно.

public void countDown()

Уменьшает счетчик защелки, освобождая все ожидающие потоки, если счет достигает нуля.

Если текущий счетчик больше нуля, то он уменьшается. Если новый счетчик равен нулю, то все ожидающие потоки повторно включаются для целей планирования потоков.

Объяснение вашего примера.

  1. Вы установили счет как 3 для latch переменная

    CountDownLatch latch = new CountDownLatch(3);
    
  2. Вы прошли этот общий доступ latch Рабочему потоку: Processor

  3. Три Runnable случаи Processor были представлены ExecutorServiceexecutor
  4. Основная тема (App) ожидает подсчета, чтобы стать нулем с оператором ниже

     latch.await();  
    
  5. Processor поток спит в течение 3 секунд, а затем уменьшает значение счетчика с latch.countDown()
  6. Первый Process экземпляр изменит количество защелок на 2 после его завершения из-за latch.countDown(),

  7. второй Process экземпляр изменит счетчик защелок на 1 после его завершения из-за latch.countDown(),

  8. В третьих Process Экземпляр изменит счетчик защелок на 0 после его завершения из-за latch.countDown(),

  9. Нулевой отсчет на защелке вызывает основной поток App выйти из await

  10. Приложение App теперь печатает этот вывод: Completed

Один хороший пример того, когда использовать что-то подобное, - это Java Simple Serial Connector для доступа к последовательным портам. Обычно вы что-то записываете в порт, и асинхронно в другом потоке устройство отвечает SerialPortEventListener. Как правило, вы хотите сделать паузу после записи в порт, чтобы дождаться ответа. Ручная обработка потоков для этого сценария чрезвычайно сложна, но использовать Countdownlatch легко. Прежде чем думать, что вы можете сделать это по-другому, будьте осторожны с условиями гонки, о которых вы никогда не думали!!

псевдокод:

CountDownLatch latch;
void writeData() { 
   latch = new CountDownLatch(1);
   serialPort.writeBytes(sb.toString().getBytes())
   try {
      latch.await(4, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
   }
}
class SerialPortReader implements SerialPortEventListener {
    public void serialEvent(SerialPortEvent event) {
        if(event.isRXCHAR()){//If data is available
            byte buffer[] = serialPort.readBytes(event.getEventValue());
            latch.countDown();
         }
     }
}

Если вы добавите отладку после вызова latch.countDown(), это может помочь вам лучше понять его поведение.

latch.countDown();
System.out.println("DONE "+this.latch); // Add this debug

Выходные данные покажут, что число уменьшается. Это 'count' - это фактически количество запущенных задач (объектов Processor), с которыми вы начали, countDown () не был вызван и, следовательно, блокируется основным потоком при вызове latch.await().

DONE java.util.concurrent.CountDownLatch@70e69696[Count = 2]
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 1]
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 0]
package practice;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch c= new CountDownLatch(3);  // need to decrements the count (3) to zero by calling countDown() method so that main thread will wake up after calling await() method 
        Task t = new Task(c);
        Task t1 = new Task(c);
        Task t2 = new Task(c);
        t.start();
        t1.start();
        t2.start();
        c.await(); // when count becomes zero main thread will wake up 
        System.out.println("This will print after count down latch count become zero");
    }
}

class Task extends Thread{
    CountDownLatch c;

    public Task(CountDownLatch c) {
        this.c = c;
    }

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName());
            Thread.sleep(1000);
            c.countDown();   // each thread decrement the count by one 
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Лучший пример реального времени для countDownLatch, объясненный в этой ссылке CountDownLatchExample

Лучший вариант CyclicBarrier, согласно https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html См.:

CountDownLatch инициализируется заданным счетчиком. Методы await блокируются до тех пор, пока текущий счетчик не достигнет нуля из-за вызовов метода countDown(), после чего все ожидающие потоки освобождаются, а все последующие вызовы await возвращаются немедленно. Это одноразовое явление — счетчик не может быть сброшен. Если вам нужна версия, которая сбрасывает счетчик, рассмотрите возможность использования CyclicBarrier.

Другие вопросы по тегам