Запись с помощью одного потока LMAX
Я познакомился с LMAX и этой замечательной концепцией под названием RingBuffer. Так что, ребята, говорите, что при записи в кольцевой буфер с одним потоком производительность намного выше, чем у нескольких производителей...
Однако я не считаю возможным, чтобы типичное приложение использовало только один поток для записи в кольцевой буфер... Я действительно не понимаю, как lmax делает это (если они это делают). Например, N различных трейдеров выставляют ордера на биржу, это все асинхронные запросы, которые преобразуются в ордера и помещаются в кольцевой буфер, как они могут писать те, которые используют один поток?
Вопрос 1. Я мог бы что-то упустить или неправильно понять какой-то аспект, но если у вас есть N одновременно работающих производителей, как можно объединить их в 1 и не блокировать друг друга?
Вопрос 2. Я вспоминаю наблюдаемые rxJava, где вы можете взять N наблюдаемых и объединить их в 1, используя Observable.merge. Интересно, блокирует ли он какую-либо блокировку или поддерживает ее каким-либо образом?
2 ответа
Влияние многопотоковой записи на RingBuffer незначительно, но при очень больших нагрузках может быть значительным.
Реализация RingBuffer содержит next
узел, где будет сделано следующее добавление. Если в кольцо записывается только один поток, процесс всегда завершается за минимальное время, т.е. buffer[head++] = newData
,
Для обработки многопоточности, избегая блокировок, вы обычно делаете что-то вроде while ( !buffer[head++].compareAndSet(null,newValue)){}
, Этот жесткий цикл будет продолжать выполняться, пока другие потоки мешают хранению данных, тем самым замедляя пропускную способность.
Обратите внимание, что я использовал псевдокод выше, взгляните на getFree
в моей реализации здесь для реального примера.
// Find the next free element and mark it not free.
private Node<T> getFree() {
Node<T> freeNode = head.get();
int skipped = 0;
// Stop when we hit the end of the list
// ... or we successfully transit a node from free to not-free.
// This is the loop that could cause delays under hight thread activity.
while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
skipped += 1;
freeNode = freeNode.next;
}
// ...
}
Внутренне, слияние RxJava использует конструкцию сериализации, которую я называю цикл-эмиттер, который использует synchronized
и блокирует.
Наши "клиенты" используют слияние в основном в случаях нечувствительности к пропускной способности и времени ожидания или полностью однопоточных, и блокировка на самом деле не является проблемой.
Можно написать неблокирующий сериализатор, который я называю " утечка очереди", но слияние не может быть настроено для его использования.
Вы также можете взглянуть на JCTools' MpscArrayQueue
непосредственно, если вы готовы обрабатывать потоки производителя и потребителя вручную.