Java BlockingQueue блокировка на take(), с небольшим поворотом

У меня есть ситуация, когда у меня есть 2 блокирующие очереди. Сначала я вставляю некоторые задачи, которые я выполняю. Когда каждая задача завершается, она добавляет задачу во вторую очередь, где они выполняются.

Итак, моя первая очередь проста: я просто проверяю, чтобы убедиться, что она не пуста, и выполняю, иначе я прерываю ():

public void run() {
    try {
        if (taskQueue1.isEmpty()) {
            SomeTask task = taskQueue1.poll();
            doTask(task);
            taskQueue2.add(task);
        }
        else {
            Thread.currentThread().interrupt();
        }
    }

    catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

Второй я делаю следующее, что, как вы можете сказать, не работает:

public void run() {
    try {
        SomeTask2 task2 = taskQueue2.take();
        doTask(task2);
    }

    catch (InterruptedException ex) {

    }
    Thread.currentThread().interrupt();

}

Как бы вы решили это так, чтобы второе BlockingQueue не блокировалось при take(), а завершалось только тогда, когда узнало, что больше нет элементов, которые нужно добавить. Было бы хорошо, если бы 2-й поток мог видеть 1-ую очередь блокировки, возможно, и проверить, была ли она пуста, а 2-я очередь также пуста, тогда он прервется.

Я мог бы также использовать объект Poison, но предпочел бы что-то еще.

NB: Это не точный код, просто то, что я написал здесь:

2 ответа

Решение

Это звучит так, как будто поток, обрабатывающий первую очередь, знает, что больше задач не будет, как только его очередь будет очищена. Это звучит подозрительно, но я верю вам на слово и в любом случае предложу решение.

Определить AtomicInteger видны обеим нитям. Инициализируйте это к положительному.

Определите операцию первого потока следующим образом:

  • Цикл на Queue#poll(),
  • Если Queue#poll() возвращает ноль, вызов AtomicInteger#decrementAndGet() на общее целое число.
    • Если AtomicInteger#decrementAndGet() вернул ноль, прервите второй поток через Thread#interrupt(), (Это обрабатывает случай, когда никакие предметы никогда не поступали.)
    • В любом случае выйдите из цикла.
  • В противном случае обработайте извлеченный элемент, позвоните AtomicInteger#incrementAndGet() в общем целом числе добавьте извлеченный элемент в очередь второго потока и продолжите цикл.

Определите операцию второго потока следующим образом:

  • Блокировка цикла BlockingQueue#take(),
  • Если BlockingQueue#take() бросает InterruptedException, лови исключение, звони Thread.currentThread().interrupt() и выйдите из цикла.
  • В противном случае обработайте извлеченный элемент.
  • Вызов AtomicInteger#decrementAndGet() на общее целое число.
    • Если AtomicInteger#decrementAndGet() вернул ноль, выйдите из цикла.
    • В противном случае продолжите цикл.

Убедитесь, что вы понимаете идею, прежде чем пытаться написать реальный код. Контракт заключается в том, что второй поток продолжает ожидать больше элементов из своей очереди, пока число ожидаемых задач не достигнет нуля. В этот момент производящий поток (первый) больше не будет помещать какие-либо новые элементы в очередь второго потока, поэтому второй поток знает, что безопасно прекратить обслуживание своей очереди.

Винтовой случай возникает, когда никакие задачи никогда не достигают очереди первого потока. Поскольку второй поток только уменьшает и проверяет счетчик после того, как обработает элемент, если он никогда не получит возможность обработать какие-либо элементы, он никогда не будет рассматривать остановку. Мы используем прерывание потока для обработки этого случая за счет другого условного перехода на этапах завершения цикла первого потока. К счастью, эта ветка будет выполнена только один раз.

Есть много проектов, которые могли бы работать здесь. Я просто описал один, который ввел только одну дополнительную сущность - общее атомное целое число - но даже тогда это сложно. Я думаю, что использование таблеток с ядом было бы намного чище, хотя я признаю, что ни Queue#add() ни BlockingQueue#put() принять ноль в качестве допустимого элемента (из-за Queue#poll() возврат контракта). В противном случае было бы легко использовать ноль в качестве отравляющей таблетки.

Я не могу понять, что вы на самом деле пытаетесь сделать здесь, но я могу сказать, что interrupt() в вашем первом run() метод либо бессмысленный, либо неправильный.

  • Если вы работаете run() метод в вашем собственном Thread объект, тогда этот поток все равно собирается выйти, так что нет смысла прерывать его.

  • Если вы работаете run() метод в исполнителе с пулом потоков, тогда вы, скорее всего, не хотите уничтожать поток или вообще выключать исполнителя... в этот момент. И если вы хотите завершить работу исполнителя, вам следует вызвать один из его методов завершения работы.


Например, вот версия, которая делает то, что вы делаете без всяких прерываний и без создания / уничтожения потоков.

public class TaskExecutor {

    private ExecutorService executor = new ThreadPoolExecutorService(...);

    public void submitTask1(final SomeTask task) {
        executor.submit(new Runnable(){
            public void run() {
                doTask(task);
                submitTask2(task);
            }
        });
    }

    public void submitTask2(final SomeTask task) {
        executor.submit(new Runnable(){
            public void run() {
                doTask2(task);
            }
        });
    }

    public void shutdown() {
        executor.shutdown();
    }
 }

Если вы хотите создать отдельную очередь для задач, просто создайте и используйте двух разных исполнителей.

Другие вопросы по тегам