Запись с помощью одного потока LMAX

Я познакомился с LMAX и этой замечательной концепцией под названием RingBuffer. Так что, ребята, говорите, что при записи в кольцевой буфер с одним потоком производительность намного выше, чем у нескольких производителей...

Однако я не считаю возможным, чтобы типичное приложение использовало только один поток для записи в кольцевой буфер... Я действительно не понимаю, как lmax делает это (если они это делают). Например, N различных трейдеров выставляют ордера на биржу, это все асинхронные запросы, которые преобразуются в ордера и помещаются в кольцевой буфер, как они могут писать те, которые используют один поток?

Вопрос 1. Я мог бы что-то упустить или неправильно понять какой-то аспект, но если у вас есть N одновременно работающих производителей, как можно объединить их в 1 и не блокировать друг друга?

Вопрос 2. Я вспоминаю наблюдаемые rxJava, где вы можете взять N наблюдаемых и объединить их в 1, используя Observable.merge. Интересно, блокирует ли он какую-либо блокировку или поддерживает ее каким-либо образом?

2 ответа

Решение

Влияние многопотоковой записи на RingBuffer незначительно, но при очень больших нагрузках может быть значительным.

Реализация RingBuffer содержит next узел, где будет сделано следующее добавление. Если в кольцо записывается только один поток, процесс всегда завершается за минимальное время, т.е. buffer[head++] = newData,

Для обработки многопоточности, избегая блокировок, вы обычно делаете что-то вроде while ( !buffer[head++].compareAndSet(null,newValue)){}, Этот жесткий цикл будет продолжать выполняться, пока другие потоки мешают хранению данных, тем самым замедляя пропускную способность.

Обратите внимание, что я использовал псевдокод выше, взгляните на getFree в моей реализации здесь для реального примера.

  // Find the next free element and mark it not free.
  private Node<T> getFree() {
    Node<T> freeNode = head.get();
    int skipped = 0;
    // Stop when we hit the end of the list
    // ... or we successfully transit a node from free to not-free.
    // This is the loop that could cause delays under hight thread activity.
    while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
      skipped += 1;
      freeNode = freeNode.next;
    }
    // ...
  }

Внутренне, слияние RxJava использует конструкцию сериализации, которую я называю цикл-эмиттер, который использует synchronized и блокирует.

Наши "клиенты" используют слияние в основном в случаях нечувствительности к пропускной способности и времени ожидания или полностью однопоточных, и блокировка на самом деле не является проблемой.

Можно написать неблокирующий сериализатор, который я называю " утечка очереди", но слияние не может быть настроено для его использования.

Вы также можете взглянуть на JCTools' MpscArrayQueue непосредственно, если вы готовы обрабатывать потоки производителя и потребителя вручную.

Другие вопросы по тегам