Замена буферов в потоках с одним записывающим и несколькими читателями

История

Есть поток писателя, периодически собирающий данные откуда-то (в режиме реального времени, но это не имеет большого значения в этом вопросе). Тогда многие читатели читают по этим данным. Обычное решение для этого с двумя блокировками читателя-писателя и двумя буферами как это:

Writer (case 1):
acquire lock 0                        
loop
    write to current buffer
    acquire other lock
    free this lock
    swap buffers
    wait for next period

Или же

Writer (case 2):
acquire lock 0                        
loop
    acquire other lock
    free this lock
    swap buffers
    write to current buffer
    wait for next period

Эта проблема

В обоих методах, если операция получения другой блокировки завершится неудачно, перестановка не будет выполнена, и модуль записи перезапишет свои предыдущие данные (поскольку модуль записи работает в режиме реального времени, он не может ждать читателей). Таким образом, в этом случае все читатели потеряют этот кадр. данных.

Это не такая уж большая проблема, читатели - мой собственный код, и они короткие, поэтому с двойным буфером эта проблема решена, и если бы возникла проблема, я мог бы сделать это тройным буфером (или больше).

Проблема в задержке, которую я хочу минимизировать. Представьте себе случай 1:

writer writes to buffer0                reader is reading buffer1
writer can't acquire lock1              because reader is still reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      <- **this point**
|
|
writer wakes up, and again writes to buffer0

В этот момент ** другие теоретически читатели могли прочитать данные buffer0 если бы только автор мог сделать обмен после окончания чтения, вместо того, чтобы ждать его следующего периода. В этом случае произошло то, что только из-за того, что один читатель немного опоздал, все читатели пропустили один кадр данных, в то время как проблему можно было полностью избежать.

Случай 2 похож:

writer writes to buffer0                reader is idle
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)
|
|                                       reader starts reading buffer1
writer wakes up                         |
it can't acquire lock0                  because reader is still reading buffer1
overwrites buffer0

Я попытался смешать решения, поэтому писатель пытается поменять буферы сразу после записи и, если это невозможно, сразу после пробуждения в следующем периоде. Так что-то вроде этого:

Writer (case 3):
acquire lock 0                        
loop
    if last buffer swap failed
        acquire other lock
        free this lock
        swap buffers
    write to current buffer
    acquire other lock
    free this lock
    swap buffers
    wait for next period

Теперь проблема с задержкой сохраняется:

writer writes to buffer0                reader is reading buffer1
writer can't acquire lock1              because reader is still reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      <- **this point**
|
|
writer wakes up
swaps buffers
writes to buffer1

Снова в этот момент ** все читатели могут начать читать buffer0, что является небольшой задержкой после buffer0 было написано, но вместо этого они должны ждать до следующего периода писателя.

Вопрос

Вопрос в том, как мне справиться с этим? Если я хочу, чтобы писатель выполнял точно в нужный период, ему нужно ждать период, используя функцию RTAI, и я не могу сделать это как

Writer (case 4):
acquire lock 0                        
loop
    write to current buffer
    loop a few times or until the buffer has been swapped
        sleep a little
        acquire other lock
        free this lock
        swap buffers
    wait for next period

Это вводит джиттер. потому что "несколько раз" может оказаться длиннее, чем "ждать следующего периода", поэтому автор может пропустить начало своего периода.

Просто чтобы быть более понятным, вот что я хочу, чтобы произошло:

writer writes to buffer0                reader is reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      As soon as all readers finish reading,
|                                         the buffer is swapped
|                                       readers start reading buffer0
writer wakes up                         |
writes to buffer1

Что я нашел уже

Я нашел read-copy-update, который, насколько я понял, продолжает выделять память для буферов и освобождает их до тех пор, пока читатели не закончат с ними, что для меня невозможно по многим причинам. Во-первых, потоки распределяются между ядром и пространством пользователя. Во-вторых, с помощью RTAI вы не можете распределять память в потоке реального времени (потому что тогда ваш поток будет вызывать системные вызовы Linux и, следовательно, нарушать работу в реальном времени! (Не говоря уже о том, что использование собственной реализации RCU в Linux бесполезно из-за по тем же причинам)

Я также думал о том, чтобы иметь дополнительный поток, который на более высокой частоте пытается поменять буферы, но это не очень хорошая идея. Во-первых, он сам должен был бы синхронизироваться с писателем, а во-вторых, у меня много таких писателей-читателей, работающих в разных частях параллельно, и один дополнительный поток для каждого писателя кажется слишком большим. Один поток для всех авторов кажется очень сложным в отношении синхронизации с каждым автором.

5 ответов

Решение

Какой API вы используете для блокировки чтения-записи? У вас есть временная блокировка, например, pthread_rwlock_timedwrlock? Если да, я думаю, что это решение вашей проблемы, как в следующем коде:

void *buf[2];

void
writer ()
{
  int lock = 0, next = 1;

  write_lock (lock);
  while (1)
    {
      abs_time tm = now() + period;

      fill (buf [lock]);
      if (timed_write_lock (next, tm))
        {
          unlock (lock);
          lock = next;
          next = (next + 1) & 1;
        }
      wait_period (tm);
    }
}


void
reader ()
{
  int lock = 0;
  while (1)
    {
      reade_lock (lock);
      process (buf [lock]);
      unlock (lock);
      lock = (lock + 1) & 1;
    }
}

Что здесь происходит, так это то, что для автора не имеет значения, ожидает ли он блокировки или в течение следующего периода, если он обязательно проснется до наступления следующего периода. Абсолютный тайм-аут гарантирует это.

Если вы не хотите, чтобы писатель ждал, возможно, он не получит блокировку, которую может удержать кто-либо еще. Я хотел бы, чтобы он выполнял какую-то синхронизацию, чтобы убедиться, что записываемые данные действительно записаны - как правило, большинство вызовов синхронизации приводят к выполнению инструкции очистки памяти или барьера, но детали зависят от модели памяти. вашего процессора и реализации вашего пакета потоков.

Я бы посмотрел, есть ли какой-нибудь другой синхронизирующий примитив, который подходит лучше, но если толчок придет, я получу блокировку писателя и разблокирую блокировку, которую никто больше не использует.

Затем читатели должны быть готовы пропустить что-то время от времени, и должны быть в состоянии определить, когда они что-то пропустили. Я бы связал флаг достоверности и длинный счетчик последовательностей с каждым буфером, а писатель сделал что-то вроде "очистить флаг достоверности, увеличить количество последовательностей, синхронизировать, записать в буфер, увеличить число последовательностей, установить флаг достоверности, синхронизировать". Если считыватель читает счетчик последовательностей, синхронизирует, видит флаг достоверности true, считывает данные, синхронизирует и повторно считывает тот же счетчик последовательностей, то, возможно, есть некоторая надежда, что он не получил искаженные данные.

Если вы собираетесь это сделать, я бы проверил это исчерпывающе. Это выглядит правдоподобно для меня, но это может не работать с вашей конкретной реализацией всего, от компилятора до модели памяти.

Другая идея, или способ проверить это, - добавить контрольную сумму в буфер и записать ее последней.

См. Также поиск по алгоритмам без блокировки, таким как http://www.rossbencina.com/code/lockfree

Чтобы пойти с этим, вы, вероятно, хотите, чтобы писатель дал сигнал спящим читателям. Для этого вы можете использовать семафоры Posix - например, попросите читателя попросить автора вызвать sem_post() для определенного семафора, когда он достигнет заданного порядкового номера или когда буфер станет действительным.

Разве это не та проблема, которую должна решить тройная буферизация? Итак, у вас есть 3 буфера, давайте назовем их write1, write2 и read. Поток записи чередуется между записью в write1 и write2, гарантируя, что они никогда не блокируются, и что последний полный кадр всегда доступен. Затем в потоках чтения, в некоторой подходящей точке (скажем, непосредственно перед или после чтения кадра), буфер чтения переключается с доступным буфером записи.

Хотя это гарантирует, что записывающие устройства никогда не блокируются (переворачивание буфера может быть выполнено очень быстро, просто переключив два указателя, возможно, даже с атомарным CAS вместо блокировки), все еще остается проблема читателей, ожидающих завершения других читателей. с буфером чтения перед переключением. Я полагаю, что это можно решить слегка RCU-esque с помощью пула буферов чтения, где можно перевернуть доступный.

  • Использовать очередь (связанный список FIFO)
  • Писатель в реальном времени всегда будет добавлять (ставить в очередь) в конец очереди
  • Читатели всегда удаляют (удаляют) из начала очереди
  • Читатели заблокируют, если очередь пуста

редактировать, чтобы избежать динамического выделения

Возможно, я бы использовал круговую очередь... Я бы использовал встроенные __sync атомарные операции. http://gcc.gnu.org/onlinedocs/gcc-4.1.0/gcc/Atomic-Builtins.html

  • Круговая очередь (FIFO 2d массив)
    • например: byte[][] Array = новый байт [MAX_SIZE][BUFFER_SIZE];
    • Указатели начала и конца индекса
  • Writer перезаписывает буфер в массиве [End] []
    • Writer может увеличивать Start, если он заканчивается циклом
  • Reader получает буфер из массива [Start][]
    • Считыватель блокируется, если Start == End

Другой вариант - придерживаться блокировки, но убедитесь, что читатели никогда не зависают слишком долго, удерживая блокировку. Читатели могут сохранять время, затрачиваемое на удержание блокировки, коротким и предсказуемым, не делая ничего другого, пока они удерживают эту блокировку, кроме копирования данных из буфера пишущего устройства. Единственная проблема заключается в том, что считыватель с низким приоритетом может быть прерван задачей с более высоким приоритетом на полпути после записи, и лекарство от этого - http://en.wikipedia.org/wiki/Priority_ceiling_protocol.

Учитывая это, если поток записи имеет высокий приоритет, наихудшая работа, выполняемая для каждого буфера, заключается в том, что поток записи заполняет буфер, а каждый поток чтения копирует данные из этого буфера в другой буфер. Если вы можете позволить себе это в каждом цикле, то процесс записи и некоторый объем копирования данных считывателя всегда будут завершены, в то время как считыватели, обрабатывающие скопированные данные, могут выполнять или не выполнять свою работу. Если они этого не сделают, они будут отставать и заметят это, когда они в следующий раз захватят блокировку и оглянутся, чтобы увидеть, какой буфер они хотят скопировать.

FWIW, мой опыт чтения кода в реальном времени (когда требуется показать, что ошибок есть, а не в нашем коде), заключается в том, что он невероятно и намеренно прост, очень четко изложен и не обязательно более эффективен, чем он должно быть, чтобы уложиться в сроки, поэтому некоторое, по-видимому, бессмысленное копирование данных, чтобы обеспечить прямую блокировку для работы, может быть хорошей сделкой, если вы можете себе это позволить.

Другие вопросы по тегам