Java: два WAITING + один BLOCKED потоки, notify() приводит к livelock, notifyAll() нет, почему?
Я пытался реализовать нечто похожее на ограниченный интерфейс Java BlockingQueue, используя "примитивы" синхронизации Java (synchronized, wait (), notify ()), когда наткнулся на какое-то поведение, которое я не понимаю.
Я создаю очередь, способную хранить 1 элемент, создаю два потока, которые ожидают извлечения значения из очереди, запускают их, а затем пытаются поместить два значения в очередь в синхронизированном блоке в главном потоке. Большую часть времени это работает, но иногда два потока, ожидающие значения, начинают как бы разбудить друг друга и не дать основному потоку войти в синхронизированный блок.
Вот мой (упрощенный) код:
import java.util.LinkedList;
import java.util.Queue;
public class LivelockDemo {
private static final int MANY_RUNS = 10000;
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < MANY_RUNS; i++) { // to increase the probability
final MyBoundedBlockingQueue ctr = new MyBoundedBlockingQueue(1);
Thread t1 = createObserver(ctr, i + ":1");
Thread t2 = createObserver(ctr, i + ":2");
t1.start();
t2.start();
System.out.println(i + ":0 ready to enter synchronized block");
synchronized (ctr) {
System.out.println(i + ":0 entered synchronized block");
ctr.addWhenHasSpace("hello");
ctr.addWhenHasSpace("world");
}
t1.join();
t2.join();
System.out.println();
}
}
public static class MyBoundedBlockingQueue {
private Queue<Object> lst = new LinkedList<Object>();;
private int limit;
private MyBoundedBlockingQueue(int limit) {
this.limit = limit;
}
public synchronized void addWhenHasSpace(Object obj) throws InterruptedException {
boolean printed = false;
while (lst.size() >= limit) {
printed = __heartbeat(':', printed);
notify();
wait();
}
lst.offer(obj);
notify();
}
// waits until something has been set and then returns it
public synchronized Object getWhenNotEmpty() throws InterruptedException {
boolean printed = false;
while (lst.isEmpty()) {
printed = __heartbeat('.', printed); // show progress
notify();
wait();
}
Object result = lst.poll();
notify();
return result;
}
// just to show progress of waiting threads in a reasonable manner
private static boolean __heartbeat(char c, boolean printed) {
long now = System.currentTimeMillis();
if (now % 1000 == 0) {
System.out.print(c);
printed = true;
} else if (printed) {
System.out.println();
printed = false;
}
return printed;
}
}
private static Thread createObserver(final MyBoundedBlockingQueue ctr,
final String name) {
return new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(name + ": saw " + ctr.getWhenNotEmpty());
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
}, name);
}
}
Вот что я вижу, когда он "блокирует":
(skipped a lot)
85:0 ready to enter synchronized block
85:0 entered synchronized block
85:2: saw hello
85:1: saw world
86:0 ready to enter synchronized block
86:0 entered synchronized block
86:2: saw hello
86:1: saw world
87:0 ready to enter synchronized block
............................................
..........................................................................
..................................................................................
(goes "forever")
Однако если я изменю вызовы notify () внутри циклов while(...) методов addWhenHasSpace и getWhenNotEmpty на notifyAll(), он "всегда" проходит.
У меня такой вопрос: почему в этом случае поведение между методами notify () и notifyAll () меняется, а также почему поведение notify () так и есть?
Я ожидал бы, что оба метода будут вести себя одинаково в этом случае (два потока ожидают, один заблокирован), потому что:
- мне кажется, что в этом случае notifyAll () будет пробуждать только другой поток, так же, как notify();
- похоже, что выбор метода, который пробуждает поток, влияет на то, как проснувшийся поток (и, я полагаю, становится RUNNABLE) и основной поток (который был заблокирован) позже конкурируют за блокировку - не то, чего я ожидал бы от Javadoc, а также поиск в Интернете по этой теме.
А может я вообще что-то не так делаю?
2 ответа
Похоже, что с помощью внутренней блокировки происходит какая-то справедливость / угрызения совести - возможно, из-за некоторой оптимизации. Я предполагаю, что нативный код проверяет, уведомил ли текущий поток монитор о своем ожидании и разрешил ли ему выиграть.
Заменить synchronized
с ReentrantLock
и это должно работать так, как вы ожидаете. Различия здесь в том, как ReentrantLock
обрабатывает официантов замка, о котором он уведомил.
Обновить:
Интересная находка здесь. То, что вы видите, это гонка между main
вход потока
synchronized (ctr) {
System.out.println(i + ":0 entered synchronized block");
ctr.addWhenHasSpace("hello");
ctr.addWhenHasSpace("world");
}
в то время как другие два потока вводят свои соответствующие synchronized
регионы. Если основной поток не попадает в свою область синхронизации раньше, чем хотя бы один из двух, у вас возникнет описанный вами выход из режима live-lock.
Похоже, происходит то, что если оба потока потребителя сначала попадут в блок синхронизации, они будут пинг-понг друг с другом для notify
а также wait
, Это может быть тот случай, когда JVM отдает потоки, которые ожидают приоритет для монитора, пока потоки заблокированы.
Не заглядывая слишком глубоко в ваш код, я вижу, что вы используете одну переменную условия для реализации очереди с одним производителем и несколькими потребителями. Это рецепт для проблемы: если есть только одна переменная условия, то когда потребитель вызывает notify()
нет способа узнать, разбудит ли он производителя или разбудит другого потребителя.
Есть два выхода из этой ловушки: самый простой - всегда использовать notifyAll().
Другой способ - прекратить использование synchronized
, wait()
, а также notify()
и вместо этого используйте средства в java.util.concurrent.locks.
Один объект ReentrantLock может дать вам две (или более) переменные условия. Используйте один исключительно для производителя, чтобы уведомить потребителей, и используйте другой исключительно для потребителей, чтобы уведомить производителя.
Примечание. Имена меняются, когда вы переключаетесь на использование ReentrantLocks: o.wait()
становится c.await()
, а также o.notify()
становится c.signal()
,