Правильное использование ReentrantReadWriteLock при передаче сигналов от писателя к читателю?

Шаблон использования возник по следующим причинам:

  1. Мне нужно прочитать поток, чтобы дождаться данных, если они отсутствуют с условиями.

  2. Блокировка чтения не поддерживает условие, поэтому условие должно быть взято из блокировки записи.

  3. Поскольку поток чтения будет ожидать выполнения условия, он также должен заблокировать запись.

У меня есть следующее определение блокировок в классе:

private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
protected final Lock readLock = rwl.readLock();
protected final Lock writeLock = rwl.writeLock();
protected final Condition hasData = writeLock.newCondition();

В моем методе писателя у меня есть следующий шаблон:

try {
   writeLock.lock();    

   //...

   if( something_written ) {
      hasData.signalAll();
   }

}
finally {
   writeLock.unlock();
}

В моем методе чтения у меня есть следующий шаблон

try {
   readLock.lock();    

   while( data_absent ) {

      // I need to acquire write lock to wait for condition!
      try {

          // first releasing read lock since we can't acquire write lock otherwise
          // unfortunately this won't release a lock if it was acquired more than once (reentrant)
          readLock.unlock();

          // acquiring write lock to wait it's condition
          writeLock.lock();
          hasData.await(1000, TimeUnit.MILLISECONDS);
      }
      finally {

          // releasing write lock back
          writeLock.unlock();

          // reacquiring read lock
          // again see note about reentrancy
          readLock.lock();
      }


   }

   // reading

}
finally {
   readLock.unlock();
}

Правильная ли картина выше?

Проблема заключается в том, что если читатель повторно входит, то есть блокирует чтение более одного раза, то освобождающий код не работает, и считыватель зависает на линии получения блокировки записи.

5 ответов

Это похоже на классический образец производителя / потребителя, поэтому я предлагаю вам взглянуть на существующие структуры данных для этой цели, например, на реализацию BlockingQueue.

Нить производителя put() данные в очереди, потребительские потоки take() данные из очереди.

Ручная синхронизация / блокировка всегда должны быть последним средством.

Ваш шаблон использования неверен: читатель должен задействовать только блокировку чтения; то же самое для писателя. Семантика такова: многие читатели могут получить блокировку чтения одновременно, если блокировка записи свободна; Писатель может получить блокировку записи только в том случае, если не получена другая блокировка (либо чтение, либо запись)

В своем коде писателя вы пытаетесь получить блокировку чтения, пока вы все еще удерживаете блокировку записи; аналогично для читателя кода.

Вот что я бы сделал в вашей ситуации:

private final ReentrantReadWriteLock    rwl         = new ReentrantReadWriteLock();
protected final Lock                    readLock    = rwl.readLock();
protected final Lock                    writeLock   = rwl.writeLock();
protected final Condition               hasData     = writeLock.newCondition();


public void write() {

    writeLock.lock();
    try {
        // write data
        // ...
        if (something_written) {
            hasData.signalAll();
        }
    }
    finally {
        writeLock.unlock();
    }
}

// replace Object by something else
public Object read() throws InterruptedException {

    Object data = tryRead();

    while (data == null) {
        waitForData();
        data = tryRead();
    }

    return data;
}

// replace Object by something else
private Object tryRead() {

    readLock.lock();
    try {
        Object data = null;
        // read data
        // ...
        // if there no data available, return null
        return data;
    }
    finally {
        readLock.unlock();
    }
}

private void waitForData() throws InterruptedException {

    writeLock.lock();
    try {
        boolean data_available = // check data
        while (!data_available) {
            hasData.await(1000L, TimeUnit.MILLISECONDS);
            data_available = // check data
        }
    }
    finally {
        writeLock.unlock();
    }
}


Это то же поведение, что и в типичном случае использования ReadWriteLock, если есть доступные данные для чтения. Если данных не существует, считыватель становится "писателем" (в смысле блокировки) и ждет, пока некоторые данные не станут доступны. Цикл повторяется до тех пор, пока не будут возвращены некоторые доступные данные (или пока не произойдет прерывание).


Поскольку вы используете ReadWriteLock, это означает, что вы ожидаете гораздо большего числа операций чтения, чем операций записи, и поэтому вы выбрали блокировку, минимизирующую конфликт между потоками чтения (readLock).

Метод waitForData() превращает читателей в "писателей", потому что вместо этого они блокируют writeLock, что приводит к увеличению конкуренции между всеми потоками (читателями и писателями). Однако, поскольку предполагается, что записи выполняются намного реже, чем чтения, ситуация, при которой данные продолжают быстро переключаться между "доступным" и "недоступным", не ожидается. Другими словами, предполагаемые записи редки:

  • Если нет доступных данных для чтения, то практически все читатели, как правило, блокируют метод waitForData() через некоторое время, и все будут уведомлены одновременно, когда записываются некоторые новые данные.

  • Если есть некоторые доступные данные для чтения, то все читатели будут просто читать их, не создавая никакого конфликта между потоками при блокировке readLock.

Я думаю, что вы пытаетесь сделать, это заставить вашего читателя ждать писателя, чтобы написать, а затем вернуть некоторое значение. Если нет никакой ценности, вы хотите, чтобы ваш читательский поток просто ждал или спал. Это верно? Если то, что я понял, правильно, вот один из способов сделать это.

private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
protected final Lock readLock = rwl.readLock();
protected final Lock writeLock = rwl.writeLock();
protected final Condition hasData = writeLock.newCondition();
private HashMap myData = new HashMap(); //example structure to read and write

private final ReentrantLock dataArrivalLock = new ReentrantLock();
private final Condition dataArrivalSignal = dataArrivalLock.newCondition();

Ваш шаблон метода писателя:

try {
   writeLock.lock();    

   //...
   myData.put("foo","ffoo"); //write something !!
   if( something_written ) {
      hasData.signalAll();
   }

}
finally {
   writeLock.unlock();
}
  try {
                //signal other threads that data has been put in
                dataArrivalLock.lock();
                dataArrivalSignal.signalAll();

            } finally {
                dataArrivalLock.unlock();
            }

Ваш шаблон метода чтения

try {
            boolean gotData = false;
            while (!gotData) {
                try {
                    readLock.lock();
                    if (myData.size() > 0) {
                        gotData = true;
                        //retrieve the data that is written by writer thred!!
                        myData.get("foo");
                    }
                } finally {
                    readLock.unlock();
                }
                if(!gotData) {
 //sleep the reader thread for x milliseconds. x depends on your application requirement
                  //   Thread.sleep(250);
                    try {
                        //instead of Thread.sleep(), use the dataArrivalLock signal to wakeup
                        dataArrivalLock.lock();
                        dataArrivalSignal.await();
                        //based on how the application works a timed wait might be better !!
                        //dataArrivalSignal.await(250);
                    } finally {
                        dataArrivalLock.unlock();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } 

Это приводит к тому, что поток чтения переходит в спящий режим до тех пор, пока потоком записи не будут записаны некоторые данные.

(Вместо Thread.sleep(250) вы могли бы также использовать дополнительную блокировку ч / б чтения и записи для того же)

Как насчет следующего подхода (комментарии в коде):

public class ReadWrite
{
    private final Lock readLock;
    private final Lock writeLock;
    private final Condition condition;

    {
        ReadWriteLock rwl = new ReentrantReadWriteLock ();
        readLock = rwl.readLock ();
        writeLock = rwl.writeLock ();
        condition = writeLock.newCondition ();
    }

    private Object data;

    // If data is there, return it, otherwise, return null
    private Object tryGetData ()
    {
        readLock.lock ();
        try
        {
            return data; // May return null
        }
        finally
        {
            readLock.unlock ();
        }
    }

    // Wait for data if necessary and then return it
    private Object doGetData () throws InterruptedException
    {
        writeLock.lock ();
        try
        {
            while (data == null)
                condition.await ();

            return data;
        }
        finally
        {
            writeLock.unlock ();
        }
    }

    // Used by reader, return value cannot be null, may block
    public Object getData () throws InterruptedException
    {
        Object result = tryGetData ();
        return result == null ? doGetData () : result;
    }

    // Used by writer, data may be null
    public void setData (Object data)
    {
        writeLock.lock ();
        try
        {
            Object previousData = this.data;
            this.data = data;
            if (previousData == null && data != null)
                condition.notifyAll ();
        }
        finally
        {
            writeLock.unlock ();
        }
    }
}
Другие вопросы по тегам