Java: два WAITING + один BLOCKED потоки, notify() приводит к livelock, notifyAll() нет, почему?

Я пытался реализовать нечто похожее на ограниченный интерфейс Java BlockingQueue, используя "примитивы" синхронизации Java (synchronized, wait (), notify ()), когда наткнулся на какое-то поведение, которое я не понимаю.

Я создаю очередь, способную хранить 1 элемент, создаю два потока, которые ожидают извлечения значения из очереди, запускают их, а затем пытаются поместить два значения в очередь в синхронизированном блоке в главном потоке. Большую часть времени это работает, но иногда два потока, ожидающие значения, начинают как бы разбудить друг друга и не дать основному потоку войти в синхронизированный блок.

Вот мой (упрощенный) код:

import java.util.LinkedList;
import java.util.Queue;

public class LivelockDemo {
    private static final int MANY_RUNS = 10000;

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < MANY_RUNS; i++) { // to increase the probability
            final MyBoundedBlockingQueue ctr = new MyBoundedBlockingQueue(1);

            Thread t1 = createObserver(ctr, i + ":1");
            Thread t2 = createObserver(ctr, i + ":2");

            t1.start();
            t2.start();

            System.out.println(i + ":0 ready to enter synchronized block");
            synchronized (ctr) {
                System.out.println(i + ":0 entered synchronized block");
                ctr.addWhenHasSpace("hello");
                ctr.addWhenHasSpace("world");
            }

            t1.join();
            t2.join();

            System.out.println();
        }
    }

    public static class MyBoundedBlockingQueue {
        private Queue<Object> lst = new LinkedList<Object>();;

        private int limit;

        private MyBoundedBlockingQueue(int limit) {
            this.limit = limit;
        }

        public synchronized void addWhenHasSpace(Object obj) throws InterruptedException {
            boolean printed = false;
            while (lst.size() >= limit) {
                printed = __heartbeat(':', printed);
                notify();
                wait();
            }
            lst.offer(obj);
            notify();
        }

        // waits until something has been set and then returns it
        public synchronized Object getWhenNotEmpty() throws InterruptedException {
            boolean printed = false;
            while (lst.isEmpty()) {
                printed = __heartbeat('.', printed); // show progress
                notify();
                wait();
            }
            Object result = lst.poll();
            notify();
            return result;
        }

        // just to show progress of waiting threads in a reasonable manner
        private static boolean __heartbeat(char c, boolean printed) {
            long now = System.currentTimeMillis();
            if (now % 1000 == 0) {
                System.out.print(c);
                printed = true;
            } else if (printed) {
                System.out.println();
                printed = false;
            }
            return printed;
        }
    }

    private static Thread createObserver(final MyBoundedBlockingQueue ctr,
            final String name) {
        return new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(name + ": saw " + ctr.getWhenNotEmpty());
                } catch (InterruptedException e) {
                    e.printStackTrace(System.err);
                }
            }
        }, name);
    }
}

Вот что я вижу, когда он "блокирует":

(skipped a lot)

85:0 ready to enter synchronized block
85:0 entered synchronized block
85:2: saw hello
85:1: saw world

86:0 ready to enter synchronized block
86:0 entered synchronized block
86:2: saw hello
86:1: saw world

87:0 ready to enter synchronized block
............................................

..........................................................................

..................................................................................
(goes "forever")

Однако если я изменю вызовы notify () внутри циклов while(...) методов addWhenHasSpace и getWhenNotEmpty на notifyAll(), он "всегда" проходит.

У меня такой вопрос: почему в этом случае поведение между методами notify () и notifyAll () меняется, а также почему поведение notify () так и есть?

Я ожидал бы, что оба метода будут вести себя одинаково в этом случае (два потока ожидают, один заблокирован), потому что:

  1. мне кажется, что в этом случае notifyAll () будет пробуждать только другой поток, так же, как notify();
  2. похоже, что выбор метода, который пробуждает поток, влияет на то, как проснувшийся поток (и, я полагаю, становится RUNNABLE) и основной поток (который был заблокирован) позже конкурируют за блокировку - не то, чего я ожидал бы от Javadoc, а также поиск в Интернете по этой теме.

А может я вообще что-то не так делаю?

2 ответа

Похоже, что с помощью внутренней блокировки происходит какая-то справедливость / угрызения совести - возможно, из-за некоторой оптимизации. Я предполагаю, что нативный код проверяет, уведомил ли текущий поток монитор о своем ожидании и разрешил ли ему выиграть.

Заменить synchronized с ReentrantLock и это должно работать так, как вы ожидаете. Различия здесь в том, как ReentrantLock обрабатывает официантов замка, о котором он уведомил.


Обновить:

Интересная находка здесь. То, что вы видите, это гонка между main вход потока

        synchronized (ctr) {
            System.out.println(i + ":0 entered synchronized block");
            ctr.addWhenHasSpace("hello");
            ctr.addWhenHasSpace("world");
        }

в то время как другие два потока вводят свои соответствующие synchronized регионы. Если основной поток не попадает в свою область синхронизации раньше, чем хотя бы один из двух, у вас возникнет описанный вами выход из режима live-lock.

Похоже, происходит то, что если оба потока потребителя сначала попадут в блок синхронизации, они будут пинг-понг друг с другом для notify а также wait, Это может быть тот случай, когда JVM отдает потоки, которые ожидают приоритет для монитора, пока потоки заблокированы.

Не заглядывая слишком глубоко в ваш код, я вижу, что вы используете одну переменную условия для реализации очереди с одним производителем и несколькими потребителями. Это рецепт для проблемы: если есть только одна переменная условия, то когда потребитель вызывает notify()нет способа узнать, разбудит ли он производителя или разбудит другого потребителя.

Есть два выхода из этой ловушки: самый простой - всегда использовать notifyAll().

Другой способ - прекратить использование synchronized, wait(), а также notify()и вместо этого используйте средства в java.util.concurrent.locks.

Один объект ReentrantLock может дать вам две (или более) переменные условия. Используйте один исключительно для производителя, чтобы уведомить потребителей, и используйте другой исключительно для потребителей, чтобы уведомить производителя.

Примечание. Имена меняются, когда вы переключаетесь на использование ReentrantLocks: o.wait() становится c.await(), а также o.notify() становится c.signal(),

Другие вопросы по тегам