Java BlockingQueue блокировка на take(), с небольшим поворотом
У меня есть ситуация, когда у меня есть 2 блокирующие очереди. Сначала я вставляю некоторые задачи, которые я выполняю. Когда каждая задача завершается, она добавляет задачу во вторую очередь, где они выполняются.
Итак, моя первая очередь проста: я просто проверяю, чтобы убедиться, что она не пуста, и выполняю, иначе я прерываю ():
public void run() {
try {
if (taskQueue1.isEmpty()) {
SomeTask task = taskQueue1.poll();
doTask(task);
taskQueue2.add(task);
}
else {
Thread.currentThread().interrupt();
}
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
}
Второй я делаю следующее, что, как вы можете сказать, не работает:
public void run() {
try {
SomeTask2 task2 = taskQueue2.take();
doTask(task2);
}
catch (InterruptedException ex) {
}
Thread.currentThread().interrupt();
}
Как бы вы решили это так, чтобы второе BlockingQueue не блокировалось при take(), а завершалось только тогда, когда узнало, что больше нет элементов, которые нужно добавить. Было бы хорошо, если бы 2-й поток мог видеть 1-ую очередь блокировки, возможно, и проверить, была ли она пуста, а 2-я очередь также пуста, тогда он прервется.
Я мог бы также использовать объект Poison, но предпочел бы что-то еще.
NB: Это не точный код, просто то, что я написал здесь:
2 ответа
Это звучит так, как будто поток, обрабатывающий первую очередь, знает, что больше задач не будет, как только его очередь будет очищена. Это звучит подозрительно, но я верю вам на слово и в любом случае предложу решение.
Определить AtomicInteger
видны обеим нитям. Инициализируйте это к положительному.
Определите операцию первого потока следующим образом:
- Цикл на
Queue#poll()
, - Если
Queue#poll()
возвращает ноль, вызовAtomicInteger#decrementAndGet()
на общее целое число.- Если
AtomicInteger#decrementAndGet()
вернул ноль, прервите второй поток черезThread#interrupt()
, (Это обрабатывает случай, когда никакие предметы никогда не поступали.) - В любом случае выйдите из цикла.
- Если
- В противном случае обработайте извлеченный элемент, позвоните
AtomicInteger#incrementAndGet()
в общем целом числе добавьте извлеченный элемент в очередь второго потока и продолжите цикл.
Определите операцию второго потока следующим образом:
- Блокировка цикла
BlockingQueue#take()
, - Если
BlockingQueue#take()
бросаетInterruptedException
, лови исключение, звониThread.currentThread().interrupt()
и выйдите из цикла. - В противном случае обработайте извлеченный элемент.
- Вызов
AtomicInteger#decrementAndGet()
на общее целое число.- Если
AtomicInteger#decrementAndGet()
вернул ноль, выйдите из цикла. - В противном случае продолжите цикл.
- Если
Убедитесь, что вы понимаете идею, прежде чем пытаться написать реальный код. Контракт заключается в том, что второй поток продолжает ожидать больше элементов из своей очереди, пока число ожидаемых задач не достигнет нуля. В этот момент производящий поток (первый) больше не будет помещать какие-либо новые элементы в очередь второго потока, поэтому второй поток знает, что безопасно прекратить обслуживание своей очереди.
Винтовой случай возникает, когда никакие задачи никогда не достигают очереди первого потока. Поскольку второй поток только уменьшает и проверяет счетчик после того, как обработает элемент, если он никогда не получит возможность обработать какие-либо элементы, он никогда не будет рассматривать остановку. Мы используем прерывание потока для обработки этого случая за счет другого условного перехода на этапах завершения цикла первого потока. К счастью, эта ветка будет выполнена только один раз.
Есть много проектов, которые могли бы работать здесь. Я просто описал один, который ввел только одну дополнительную сущность - общее атомное целое число - но даже тогда это сложно. Я думаю, что использование таблеток с ядом было бы намного чище, хотя я признаю, что ни Queue#add()
ни BlockingQueue#put()
принять ноль в качестве допустимого элемента (из-за Queue#poll()
возврат контракта). В противном случае было бы легко использовать ноль в качестве отравляющей таблетки.
Я не могу понять, что вы на самом деле пытаетесь сделать здесь, но я могу сказать, что interrupt()
в вашем первом run()
метод либо бессмысленный, либо неправильный.
Если вы работаете
run()
метод в вашем собственномThread
объект, тогда этот поток все равно собирается выйти, так что нет смысла прерывать его.Если вы работаете
run()
метод в исполнителе с пулом потоков, тогда вы, скорее всего, не хотите уничтожать поток или вообще выключать исполнителя... в этот момент. И если вы хотите завершить работу исполнителя, вам следует вызвать один из его методов завершения работы.
Например, вот версия, которая делает то, что вы делаете без всяких прерываний и без создания / уничтожения потоков.
public class TaskExecutor {
private ExecutorService executor = new ThreadPoolExecutorService(...);
public void submitTask1(final SomeTask task) {
executor.submit(new Runnable(){
public void run() {
doTask(task);
submitTask2(task);
}
});
}
public void submitTask2(final SomeTask task) {
executor.submit(new Runnable(){
public void run() {
doTask2(task);
}
});
}
public void shutdown() {
executor.shutdown();
}
}
Если вы хотите создать отдельную очередь для задач, просто создайте и используйте двух разных исполнителей.