Гибкий CountDownLatch?
Я дважды сталкивался с проблемой, когда поток-производитель создает N рабочих элементов, отправляет их ExecutorService
и затем необходимо дождаться, пока все N элементов будут обработаны.
Предостережения
- N не известен заранее. Если бы это было так, я бы просто создал
CountDownLatch
а потом есть продюсерская нитьawait()
пока все работы не будут завершены. - Используя
CompletionService
неуместно, потому что, хотя мой поток производителя должен блокировать (т.е. путем вызоваtake()
) нет никакого способа сообщить, что вся работа завершена, чтобы заставить поток производителя перестать ждать.
Мое предпочитаемое в настоящее время решение состоит в том, чтобы использовать счетчик целых чисел и увеличивать его всякий раз, когда передается элемент работы, и уменьшать его при обработке рабочего элемента. После отправки всех N задач моему производственному потоку нужно будет ждать блокировки, проверяя, counter == 0
всякий раз, когда это уведомлено. Поток (и) потребителя должен будет уведомить производителя, если он уменьшил счетчик и новое значение равно 0.
Есть ли лучший подход к этой проблеме или есть подходящая конструкция в java.util.concurrent
Я должен использовать вместо того, чтобы "катиться самостоятельно"?
Заранее спасибо.
6 ответов
java.util.concurrent.Phaser
Похоже, это будет хорошо для вас. Его планируется выпустить в Java 7, но наиболее стабильную версию можно найти на веб -сайте группы интересов jsr166.
Фазер является прославленным циклическим барьером. Вы можете зарегистрировать N партий и когда будете готовы ждать их продвижения на конкретном этапе.
Быстрый пример того, как это будет работать:
final Phaser phaser = new Phaser();
public Runnable getRunnable(){
return new Runnable(){
public void run(){
..do stuff...
phaser.arriveAndDeregister();
}
};
}
public void doWork(){
phaser.register();//register self
for(int i=0 ; i < N; i++){
phaser.register(); // register this task prior to execution
executor.submit( getRunnable());
}
phaser.arriveAndAwaitAdvance();
}
Вы могли бы, конечно, использовать CountDownLatch
защищен AtomicReference
так что ваши задачи обернуты так:
public class MyTask extends Runnable {
private final Runnable r;
public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; }
public void run() {
r.run();
while (l.get() == null) Thread.sleep(1000L); //handle Interrupted
l.get().countDown();
}
}
Обратите внимание, что задачи выполняют свою работу, а затем вращаются, пока не будет установлен обратный отсчет (т. Е. Известно общее количество задач). Как только обратный отсчет установлен, они считают его и выходят. Они представлены следующим образом:
AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>();
executor.submit(new MyTask(r, l));
После точки создания / представления вашей работы, когда вы знаете, сколько задач вы создали:
latch.set(new CountDownLatch(nTasks));
latch.get().await();
Я использовал ExecutorCompletionService для чего-то вроде этого:
ExecutorCompletionService executor = ...;
int count = 0;
while (...) {
executor.submit(new Processor());
count++;
}
//Now, pull the futures out of the queue:
for (int i = 0; i < count; i++) {
executor.take().get();
}
Это включает в себя сохранение очереди задач, которые были отправлены, поэтому, если ваш список произвольно длинный, ваш метод может быть лучше.
Но обязательно используйте AtomicInteger
для координации, так что вы сможете увеличить его в один поток, и уменьшить его в рабочих потоках.
Я предполагаю, что вашему производителю не нужно знать, когда очередь пуста, но нужно знать, когда было выполнено последнее задание.
Я бы добавил waitforWorkDone(producer)
метод для потребителя. Производитель может добавить свои N задач и вызвать метод wait. Метод wait блокирует входящий поток, если рабочая очередь не пуста и в настоящий момент не выполняется никаких задач.
Потребительские темы notifyAll()
в ожидании блокировки, если его задача завершена, очередь пуста, и никакая другая задача не выполняется.
То, что вы описали, очень похоже на использование стандартного семафора, но используется "назад".
- Ваш семафор начинается с 0 разрешений
- Каждое рабочее подразделение выдает одно разрешение по завершении
- Вы блокируете, ожидая, чтобы получить N разрешений
Кроме того, у вас есть возможность приобретать только разрешения M
Автономный метод Java 8+, который делает именно это для Stream
используя один проход Phaser
, Iterator
/Iterable
варианты точно такие же.
public static <X> int submitAndWait(Stream<X> items, Consumer<X> handleItem, Executor executor) {
Phaser phaser = new Phaser(1); // 1 = register ourselves
items.forEach(item -> {
phaser.register(); // new task
executor.execute(() -> {
handleItem.accept(item);
phaser.arrive(); // completed task
});
});
phaser.arriveAndAwaitAdvance(); // block until all tasks are complete
return phaser.getRegisteredParties()-1; // number of items
}
...
int recognised = submitAndWait(facesInPicture, this::detectFace, executor)
FYI. Это хорошо для особых событий, но, если это вызывается одновременно, решение, включающее ForkJoinPool
остановит родительский поток от блокировки.