Как CountDownLatch используется в многопоточности Java?
Может кто-нибудь помочь мне понять, что такое Java CountDownLatch
есть и когда его использовать?
У меня нет четкого представления о том, как работает эта программа. Как я понимаю, все три потока запускаются одновременно, и каждый поток вызовет CountDownLatch через 3000 мс. Так что обратный отсчет будет уменьшаться один за другим. После того, как защелка станет нулевой, программа напечатает "Завершено". Может быть, то, как я понял, неверно.
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class Processor implements Runnable {
private CountDownLatch latch;
public Processor(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
System.out.println("Started.");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
}
}
// ------------------------------------------------ -----
public class App {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(3); // coundown from 3 to 0
ExecutorService executor = Executors.newFixedThreadPool(3); // 3 Threads in pool
for(int i=0; i < 3; i++) {
executor.submit(new Processor(latch)); // ref to latch. each time call new Processes latch will count down by 1
}
try {
latch.await(); // wait until latch counted down to 0
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Completed.");
}
}
13 ответов
Да, вы правильно поняли. CountDownLatch
работает по принципу защелки, основной поток будет ждать, пока ворота не откроются. Один поток ожидает n потоков, указанных при создании CountDownLatch
,
Любой поток, обычно основной поток приложения, который вызывает CountDownLatch.await()
будет ждать, пока счетчик не достигнет нуля или не прервется другим потоком. Все остальные потоки обязаны вести обратный отсчет путем вызова CountDownLatch.countDown()
как только они завершены или готовы.
Как только счетчик достигает нуля, ожидающий поток продолжается. Один из недостатков / преимуществ CountDownLatch
является то, что он не может быть повторно использован: как только счет достигает нуля, вы не можете использовать CountDownLatch
больше
Редактировать:
использование CountDownLatch
когда одному потоку (например, основному потоку) требуется дождаться завершения одного или нескольких потоков, прежде чем он сможет продолжить обработку.
Классический пример использования CountDownLatch
в Java - это базовое Java-приложение на стороне сервера, которое использует архитектуру служб, где несколько служб предоставляются несколькими потоками, и приложение не может начать обработку, пока все службы не будут запущены успешно.
Вопрос PS OP имеет довольно простой пример, поэтому я его не включил.
CountDownLatch
в Java это тип синхронизатора, который позволяет Thread
ждать одного или нескольких Thread
до начала обработки.
CountDownLatch
работает по принципу защелки, поток будет ждать, пока ворота не откроются. Одна нить ждет n
количество потоков, указанное при создании CountDownLatch
,
например final CountDownLatch latch = new CountDownLatch(3);
Здесь мы устанавливаем счетчик на 3.
Любой поток, обычно основной поток приложения, который вызывает CountDownLatch.await()
будет ждать, пока счетчик не достигнет нуля или не прервется другим Thread
, Все остальные потоки обязаны делать обратный отсчет, вызывая CountDownLatch.countDown()
как только они завершены или готовы к работе. как только счет достигает нуля, Thread
ожидание начинает работать.
Здесь счет уменьшается на CountDownLatch.countDown()
метод.
Thread
который вызывает await()
Метод будет ждать, пока начальный счет не достигнет нуля.
Чтобы счетчик равнялся нулю, другим потокам нужно вызвать countDown()
метод. Как только счет становится нулем, поток, который вызвал await()
метод возобновится (начнется его выполнение).
Недостаток CountDownLatch
в том, что он не может быть использован повторно: как только счет станет нулевым, он больше не будет использоваться.
Он используется, когда мы хотим подождать, пока несколько потоков выполнят свою задачу. Это похоже на присоединение к темам.
Где мы можем использовать CountDownLatch
Рассмотрим сценарий, в котором у нас есть требование, когда у нас есть три потока "A", "B" и "C", и мы хотим запустить поток "C" только тогда, когда потоки "A" и "B" завершают или частично завершают свою задачу.
Это может быть применено к реальному сценарию ИТ
Рассмотрим сценарий, в котором менеджер разделяет модули между группами разработчиков (A и B) и хочет назначить их команде QA для тестирования только тогда, когда обе команды выполнят свою задачу.
public class Manager {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
MyDevTeam teamDevA = new MyDevTeam(countDownLatch, "devA");
MyDevTeam teamDevB = new MyDevTeam(countDownLatch, "devB");
teamDevA.start();
teamDevB.start();
countDownLatch.await();
MyQATeam qa = new MyQATeam();
qa.start();
}
}
class MyDevTeam extends Thread {
CountDownLatch countDownLatch;
public MyDevTeam (CountDownLatch countDownLatch, String name) {
super(name);
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
System.out.println("Task assigned to development team " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println("Task finished by development team Thread.currentThread().getName());
this.countDownLatch.countDown();
}
}
class MyQATeam extends Thread {
@Override
public void run() {
System.out.println("Task assigned to QA team");
try {
Thread.sleep(2000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println("Task finished by QA team");
}
}
Вывод вышеуказанного кода будет:
Задача, поставленная перед командой разработчиков devB
Задача, поставленная перед командой разработчиков devA
Задача выполнена командой разработчиков devB
Задача выполнена командой разработчиков devA
Задача, назначенная команде QA
Задача выполнена командой QA
Здесь метод await() ожидает, пока флаг countdownlatch станет равным 0, а метод countDown() уменьшит флаг countdownlatch на 1.
Ограничение JOIN: Приведенный выше пример также может быть достигнут с помощью JOIN, но JOIN нельзя использовать в двух сценариях:
- Когда мы используем ExecutorService вместо класса Thread для создания потоков.
- Измените приведенный выше пример, где Manager хочет передать код команде QA, как только Development выполнит свою задачу на 80%. Это означает, что CountDownLatch позволяет нам изменять реализацию, которую можно использовать для ожидания другого потока для их частичного выполнения.
NikolaB объяснил это очень хорошо, однако пример был бы полезен для понимания, так что вот один простой пример...
import java.util.concurrent.*;
public class CountDownLatchExample {
public static class ProcessThread implements Runnable {
CountDownLatch latch;
long workDuration;
String name;
public ProcessThread(String name, CountDownLatch latch, long duration){
this.name= name;
this.latch = latch;
this.workDuration = duration;
}
public void run() {
try {
System.out.println(name +" Processing Something for "+ workDuration/1000 + " Seconds");
Thread.sleep(workDuration);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name+ "completed its works");
//when task finished.. count down the latch count...
// basically this is same as calling lock object notify(), and object here is latch
latch.countDown();
}
}
public static void main(String[] args) {
// Parent thread creating a latch object
CountDownLatch latch = new CountDownLatch(3);
new Thread(new ProcessThread("Worker1",latch, 2000)).start(); // time in millis.. 2 secs
new Thread(new ProcessThread("Worker2",latch, 6000)).start();//6 secs
new Thread(new ProcessThread("Worker3",latch, 4000)).start();//4 secs
System.out.println("waiting for Children processes to complete....");
try {
//current thread will get notified if all chidren's are done
// and thread will resume from wait() mode.
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("All Process Completed....");
System.out.println("Parent Thread Resuming work....");
}
}
CoundDownLatch позволяет вам заставить поток ждать, пока все остальные потоки не завершат свое выполнение.
Псевдокод может быть:
// Main thread starts
// Create CountDownLatch for N threads
// Create and start N threads
// Main thread waits on latch
// N threads completes there tasks are returns
// Main thread resume execution
Этот пример из Java Doc помог мне понять концепции:
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
Схематически говоря:
Очевидно, CountDownLatch
позволяет один поток (здесь Driver
) ждать, пока куча запущенных потоков (здесь Worker
) сделано с их исполнением.
Как упоминалось в JavaDoc ( https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html), CountDownLatch - это средство синхронизации, представленное в Java 5. Здесь синхронизация не означает ограничение доступа к критическому разделу. Но скорее последовательность действий разных потоков. Тип синхронизации, достигнутый с помощью CountDownLatch, аналогичен типу Join. Предположим, что существует поток "M", который должен ждать, пока другие рабочие потоки "T1", "T2", "T3" завершат свои задачи. До Java 1.5 это можно было сделать следующим образом: M выполняет следующий код
T1.join();
T2.join();
T3.join();
Приведенный выше код гарантирует, что поток M возобновит свою работу после того, как T1, T2, T3 завершат свою работу. T1, T2, T3 могут завершить свою работу в любом порядке. То же самое может быть достигнуто с помощью CountDownLatch, где T1, T2, T3 и поток M совместно используют один и тот же объект CountDownLatch.
"М" просит: countDownLatch.await();
где, как "Т1", "Т2", "Т3" делает countDownLatch.countdown();
Недостатком метода соединения является то, что M должен знать о T1, T2, T3. Если позже будет добавлен новый рабочий поток T4, то M тоже должен об этом знать. Этого можно избежать с помощью CountDownLatch. После реализации последовательность действий будет [T1,T2,T3](порядок T1, T2, T3 может быть в любом случае) -> [M]
Из документации оракула о CountDownLatch:
Средство синхронизации, которое позволяет одному или нескольким потокам ожидать завершения набора операций, выполняемых в других потоках.
CountDownLatch
инициализируется с заданным количеством. await
методы блокируются, пока текущий счетчик не достигнет нуля из-за вызовов countDown()
метод, после которого все ожидающие потоки освобождаются и любые последующие вызовы ожидают немедленного возврата. Это одноразовое явление - счет не может быть сброшен.
CountDownLatch - это универсальный инструмент синхронизации, который можно использовать для различных целей.
CountDownLatch
инициализированный с счетчиком один служит простой защелкой включения / выключения, или воротами: все вызывающие потоки ожидают в воротах, пока они не будут открыты потоком, вызывающим countDown().
CountDownLatch
инициализированный N может использоваться, чтобы заставить один поток ждать, пока N потоков не выполнили какое-либо действие, или некоторое действие не было выполнено N раз.
public void await()
throws InterruptedException
Заставляет текущий поток ждать, пока защелка не обратится к нулю, если поток не прерывается.
Если текущий счетчик равен нулю, этот метод возвращается немедленно.
public void countDown()
Уменьшает счетчик защелки, освобождая все ожидающие потоки, если счет достигает нуля.
Если текущий счетчик больше нуля, то он уменьшается. Если новый счетчик равен нулю, то все ожидающие потоки повторно включаются для целей планирования потоков.
Объяснение вашего примера.
Вы установили счет как 3 для
latch
переменнаяCountDownLatch latch = new CountDownLatch(3);
Вы прошли этот общий доступ
latch
Рабочему потоку:Processor
- Три
Runnable
случаиProcessor
были представленыExecutorService
executor
Основная тема (
App
) ожидает подсчета, чтобы стать нулем с оператором нижеlatch.await();
Processor
поток спит в течение 3 секунд, а затем уменьшает значение счетчика сlatch.countDown()
Первый
Process
экземпляр изменит количество защелок на 2 после его завершения из-заlatch.countDown()
,второй
Process
экземпляр изменит счетчик защелок на 1 после его завершения из-заlatch.countDown()
,В третьих
Process
Экземпляр изменит счетчик защелок на 0 после его завершения из-заlatch.countDown()
,Нулевой отсчет на защелке вызывает основной поток
App
выйти изawait
Приложение App теперь печатает этот вывод:
Completed
Один хороший пример того, когда использовать что-то подобное, - это Java Simple Serial Connector для доступа к последовательным портам. Обычно вы что-то записываете в порт, и асинхронно в другом потоке устройство отвечает SerialPortEventListener. Как правило, вы хотите сделать паузу после записи в порт, чтобы дождаться ответа. Ручная обработка потоков для этого сценария чрезвычайно сложна, но использовать Countdownlatch легко. Прежде чем думать, что вы можете сделать это по-другому, будьте осторожны с условиями гонки, о которых вы никогда не думали!!
псевдокод:
CountDownLatch latch; void writeData() { latch = new CountDownLatch(1); serialPort.writeBytes(sb.toString().getBytes()) try { latch.await(4, TimeUnit.SECONDS); } catch (InterruptedException e) { } } class SerialPortReader implements SerialPortEventListener { public void serialEvent(SerialPortEvent event) { if(event.isRXCHAR()){//If data is available byte buffer[] = serialPort.readBytes(event.getEventValue()); latch.countDown(); } } }
Если вы добавите отладку после вызова latch.countDown(), это может помочь вам лучше понять его поведение.
latch.countDown();
System.out.println("DONE "+this.latch); // Add this debug
Выходные данные покажут, что число уменьшается. Это 'count' - это фактически количество запущенных задач (объектов Processor), с которыми вы начали, countDown () не был вызван и, следовательно, блокируется основным потоком при вызове latch.await().
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 2]
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 1]
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 0]
package practice;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch c= new CountDownLatch(3); // need to decrements the count (3) to zero by calling countDown() method so that main thread will wake up after calling await() method
Task t = new Task(c);
Task t1 = new Task(c);
Task t2 = new Task(c);
t.start();
t1.start();
t2.start();
c.await(); // when count becomes zero main thread will wake up
System.out.println("This will print after count down latch count become zero");
}
}
class Task extends Thread{
CountDownLatch c;
public Task(CountDownLatch c) {
this.c = c;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(1000);
c.countDown(); // each thread decrement the count by one
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Лучший пример реального времени для countDownLatch, объясненный в этой ссылке CountDownLatchExample
Лучший вариант
CyclicBarrier
, согласно https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html См.:
CountDownLatch инициализируется заданным счетчиком. Методы await блокируются до тех пор, пока текущий счетчик не достигнет нуля из-за вызовов метода countDown(), после чего все ожидающие потоки освобождаются, а все последующие вызовы await возвращаются немедленно. Это одноразовое явление — счетчик не может быть сброшен. Если вам нужна версия, которая сбрасывает счетчик, рассмотрите возможность использования CyclicBarrier.