Гибкий CountDownLatch?

Я дважды сталкивался с проблемой, когда поток-производитель создает N рабочих элементов, отправляет их ExecutorService и затем необходимо дождаться, пока все N элементов будут обработаны.

Предостережения

  • N не известен заранее. Если бы это было так, я бы просто создал CountDownLatch а потом есть продюсерская нить await() пока все работы не будут завершены.
  • Используя CompletionService неуместно, потому что, хотя мой поток производителя должен блокировать (т.е. путем вызова take()) нет никакого способа сообщить, что вся работа завершена, чтобы заставить поток производителя перестать ждать.

Мое предпочитаемое в настоящее время решение состоит в том, чтобы использовать счетчик целых чисел и увеличивать его всякий раз, когда передается элемент работы, и уменьшать его при обработке рабочего элемента. После отправки всех N задач моему производственному потоку нужно будет ждать блокировки, проверяя, counter == 0 всякий раз, когда это уведомлено. Поток (и) потребителя должен будет уведомить производителя, если он уменьшил счетчик и новое значение равно 0.

Есть ли лучший подход к этой проблеме или есть подходящая конструкция в java.util.concurrent Я должен использовать вместо того, чтобы "катиться самостоятельно"?

Заранее спасибо.

6 ответов

Решение

java.util.concurrent.PhaserПохоже, это будет хорошо для вас. Его планируется выпустить в Java 7, но наиболее стабильную версию можно найти на веб -сайте группы интересов jsr166.

Фазер является прославленным циклическим барьером. Вы можете зарегистрировать N партий и когда будете готовы ждать их продвижения на конкретном этапе.

Быстрый пример того, как это будет работать:

final Phaser phaser = new Phaser();

public Runnable getRunnable(){
    return new Runnable(){
        public void run(){
            ..do stuff...
            phaser.arriveAndDeregister();
        }
    };
}
public void doWork(){
    phaser.register();//register self
    for(int i=0 ; i < N; i++){
        phaser.register(); // register this task prior to execution 
        executor.submit( getRunnable());
    }
    phaser.arriveAndAwaitAdvance();
}

Вы могли бы, конечно, использовать CountDownLatch защищен AtomicReference так что ваши задачи обернуты так:

public class MyTask extends Runnable {
    private final Runnable r;
    public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; }

    public void run() {
        r.run();
        while (l.get() == null) Thread.sleep(1000L); //handle Interrupted
        l.get().countDown();
    }
}

Обратите внимание, что задачи выполняют свою работу, а затем вращаются, пока не будет установлен обратный отсчет (т. Е. Известно общее количество задач). Как только обратный отсчет установлен, они считают его и выходят. Они представлены следующим образом:

AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>();
executor.submit(new MyTask(r, l));

После точки создания / представления вашей работы, когда вы знаете, сколько задач вы создали:

latch.set(new CountDownLatch(nTasks));
latch.get().await();

Я использовал ExecutorCompletionService для чего-то вроде этого:

ExecutorCompletionService executor = ...;
int count = 0;
while (...) {
    executor.submit(new Processor());
    count++;
}

//Now, pull the futures out of the queue:
for (int i = 0; i < count; i++) {
    executor.take().get();
}

Это включает в себя сохранение очереди задач, которые были отправлены, поэтому, если ваш список произвольно длинный, ваш метод может быть лучше.

Но обязательно используйте AtomicInteger для координации, так что вы сможете увеличить его в один поток, и уменьшить его в рабочих потоках.

Я предполагаю, что вашему производителю не нужно знать, когда очередь пуста, но нужно знать, когда было выполнено последнее задание.

Я бы добавил waitforWorkDone(producer) метод для потребителя. Производитель может добавить свои N задач и вызвать метод wait. Метод wait блокирует входящий поток, если рабочая очередь не пуста и в настоящий момент не выполняется никаких задач.

Потребительские темы notifyAll() в ожидании блокировки, если его задача завершена, очередь пуста, и никакая другая задача не выполняется.

То, что вы описали, очень похоже на использование стандартного семафора, но используется "назад".

  • Ваш семафор начинается с 0 разрешений
  • Каждое рабочее подразделение выдает одно разрешение по завершении
  • Вы блокируете, ожидая, чтобы получить N разрешений

Кроме того, у вас есть возможность приобретать только разрешения M

Автономный метод Java 8+, который делает именно это для Stream используя один проход Phaser, Iterator/Iterable варианты точно такие же.

public static <X> int submitAndWait(Stream<X> items, Consumer<X> handleItem, Executor executor) {
    Phaser phaser = new Phaser(1); // 1 = register ourselves
    items.forEach(item -> {
            phaser.register(); // new task
            executor.execute(() -> {
                handleItem.accept(item);
                phaser.arrive(); // completed task
            });
    });
    phaser.arriveAndAwaitAdvance(); // block until all tasks are complete
    return phaser.getRegisteredParties()-1; // number of items
}
...
int recognised = submitAndWait(facesInPicture, this::detectFace, executor)

FYI. Это хорошо для особых событий, но, если это вызывается одновременно, решение, включающее ForkJoinPool остановит родительский поток от блокировки.

Другие вопросы по тегам