Как работает схема разрушения LMAX?
Я пытаюсь понять картину разрушителя. Я посмотрел видео InfoQ и попытался прочитать их статью. Я понимаю, что задействован кольцевой буфер, который инициализируется как чрезвычайно большой массив, чтобы использовать преимущества локальности кэша и исключить выделение новой памяти.
Похоже, есть одно или несколько атомных целых чисел, которые отслеживают позиции. Кажется, что каждое "событие" получает уникальный идентификатор, и его положение в кольце определяется путем определения его модуля относительно размера кольца и т. Д. И т. Д.
К сожалению, у меня нет интуитивного понимания того, как это работает. Я сделал много торговых приложений и изучил модель актера, посмотрел SEDA и т. Д.
В своей презентации они упомянули, что этот шаблон в основном работает как маршрутизаторы; однако я не нашел ни одного хорошего описания того, как работают маршрутизаторы.
Есть ли хорошие указатели для лучшего объяснения?
5 ответов
Проект Google Code ссылается на технический документ по реализации кольцевого буфера, однако он немного сухой, академичный и трудный для того, кто хочет узнать, как он работает. Однако, есть некоторые сообщения в блоге, которые начали объяснять внутренности более читабельным способом. Существует объяснение кольцевого буфера, который является ядром шаблона прерывателя, описание потребительских барьеров (часть, относящаяся к чтению из прерывателя) и некоторая информация об обработке нескольких доступных производителей.
Самое простое описание Disruptor: это способ отправки сообщений между потоками наиболее эффективным способом. Его можно использовать в качестве альтернативы очереди, но он также имеет ряд общих возможностей с SEDA и Actors.
По сравнению с очередями:
Disruptor предоставляет возможность передавать сообщение в другие потоки, вызывая его при необходимости (по аналогии с BlockingQueue). Однако есть 3 четких различия.
- Пользователь Disruptor определяет, как хранятся сообщения, расширяя класс Entry и предоставляя фабрику для предварительного распределения. Это позволяет либо повторно использовать (копировать) память, либо запись может содержать ссылку на другой объект.
- Ввод сообщений в Disruptor является двухфазным процессом, сначала в кольцевом буфере запрашивается слот, который предоставляет пользователю запись, которая может быть заполнена соответствующими данными. Затем запись должна быть зафиксирована, этот двухэтапный подход необходим для обеспечения гибкого использования памяти, упомянутой выше. Именно фиксация делает сообщение видимым для потоков потребителя.
- Ответственность за отслеживание сообщений, полученных из кольцевого буфера, лежит на потребителе. Удаление этой ответственности от самого кольцевого буфера помогло уменьшить количество конфликтов записи, поскольку каждый поток поддерживает свой собственный счетчик.
По сравнению с актерами
Модель Actor ближе к Disruptor, чем большинство других моделей программирования, особенно если вы используете предоставленные классы BatchConsumer/BatchHandler. Эти классы скрывают все сложности обслуживания использованных порядковых номеров и предоставляют набор простых обратных вызовов, когда происходят важные события. Тем не менее, есть несколько тонких различий.
- В Disruptor используется потребительская модель "1 поток - 1", в которой субъекты используют модель N:M, т.е. вы можете иметь столько акторов, сколько захотите, и они будут распределены по фиксированному количеству потоков (обычно по 1 на ядро).
- Интерфейс BatchHandler обеспечивает дополнительный (и очень важный) обратный вызов
onEndOfBatch()
, Это позволяет медленным потребителям, например тем, кто выполняет ввод / вывод, объединять события в пакеты для повышения пропускной способности. Пакетирование можно выполнять в других средах Actor, однако, поскольку почти во всех других инфраструктурах нет обратного вызова в конце пакета, вам нужно использовать тайм-аут для определения конца пакета, что приводит к низкой задержке.
По сравнению с SEDA
LMAX создал шаблон Disruptor для замены подхода, основанного на SEDA.
- Основным улучшением, которое он обеспечил по сравнению с SEDA, была возможность выполнять работу параллельно. Для этого Disruptor поддерживает многоадресную передачу одинаковых сообщений (в одном и том же порядке) нескольким потребителям. Это исключает необходимость использования вилочных ступеней в трубопроводе.
- Мы также позволяем потребителям ждать результатов других потребителей без необходимости ставить очередную стадию очереди между ними. Потребитель может просто посмотреть порядковый номер потребителя, от которого он зависит. Это устраняет необходимость в этапах соединения в конвейере.
По сравнению с барьерами памяти
Еще один способ думать об этом - это структурированный, упорядоченный барьер памяти. Где барьер производителя формирует барьер записи, а потребительский барьер - барьер чтения.
Сначала мы хотели бы понять модель программирования, которую он предлагает.
Есть один или несколько авторов. Есть один или несколько читателей. Существует строка записей, полностью упорядоченная от старой к новой (изображена слева направо). Авторы могут добавлять новые записи на правом конце. Каждый читатель читает записи последовательно слева направо. Читатели не могут читать прошлых авторов, очевидно.
Нет понятия удаления записи. Я использую "читатель" вместо "потребитель", чтобы избежать изображения потребляемых записей. Однако мы понимаем, что записи слева от последнего читателя становятся бесполезными.
Обычно читатели могут читать одновременно и независимо. Однако мы можем объявить зависимости среди читателей. Читательские зависимости могут быть произвольным ациклическим графом. Если читатель B зависит от читателя A, читатель B не может прочитать мимо читателя A.
Зависимость от читателя возникает потому, что читатель A может аннотировать запись, а читатель B зависит от этой аннотации. Например, A выполняет некоторые вычисления для записи и сохраняет результат в поле. a
в записи. А затем двигаться дальше, и теперь B может прочитать запись и значение a
Хранится Если читатель C не зависит от A, C не должен пытаться читать a
,
Это действительно интересная модель программирования. Независимо от производительности, одна модель может принести пользу многим приложениям.
Конечно, главная цель LMAX - производительность. Он использует предварительно распределенное кольцо записей. Кольцо достаточно большое, но оно ограничено, поэтому система не будет загружена сверх проектной мощности. Если кольцо заполнено, автор (ы) будет ждать, пока самые медленные читатели не продвинутся и освободят место.
Объекты входа предварительно выделены и живут вечно, чтобы снизить стоимость сборки мусора. Мы не вставляем новые объекты записей и не удаляем старые объекты записей, вместо этого автор запрашивает уже существующую запись, заполняет ее поля и уведомляет читателей. Это кажущееся двухфазное действие действительно просто атомное действие
setNewEntry(EntryPopulator);
interface EntryPopulator{ void populate(Entry existingEntry); }
Предварительное выделение записей также означает, что смежные записи (очень вероятно) размещаются в соседних ячейках памяти, и, поскольку читатели читают записи последовательно, важно использовать кэши ЦП.
И много усилий, чтобы избежать блокировки, CAS, даже барьера памяти (например, использовать переменную энергонезависимой последовательности, если есть только один модуль записи)
Для разработчиков читателей: Разные читатели-аннотаторы должны писать в разные поля, чтобы избежать конфликта записи. (На самом деле они должны писать в разные строки кэша.) Аннотирующий читатель не должен касаться чего-либо, что могут прочитать другие независимые читатели. Вот почему я говорю, что эти читатели комментируют записи, а не изменяют записи.
Мартин Фаулер написал статью о LMAX и структуре разрушителя, архитектуре LMAX, которая может прояснить это далее.
Я действительно нашел время, чтобы изучить фактический источник, из чистого любопытства, и идея этого довольно проста. Самая последняя версия на момент написания этого поста - 3.2.1.
Существует буфер, в котором хранятся предварительно распределенные события, которые будут содержать данные для чтения потребителями.
Буфер поддерживается массивом флагов (целочисленным массивом) его длины, который описывает доступность слотов буфера (подробнее см. Далее). Доступ к массиву осуществляется как java#AtomicIntegerArray, поэтому для целей данного объяснения вы можете также предположить, что он равен единице.
Может быть любое количество производителей. Когда производитель хочет записать в буфер, генерируется длинное число (как при вызове AtomicLong # getAndIncrement, Disruptor фактически использует свою собственную реализацию, но работает аналогичным образом). Давайте назовем это сгенерированное длинным продюсером allCallId. Аналогичным образом, customerCallId генерируется, когда потребитель заканчивает чтение слота из буфера. Доступ к последнему customerCallId
(Если есть много потребителей, выбирается звонок с самым низким идентификатором.)
Затем эти идентификаторы сравниваются, и, если разница между ними меньше, чем на стороне буфера, производителю разрешается писать.
(Если providerCallId больше, чем недавний customerCallId + bufferSize, это означает, что буфер заполнен, и производитель вынужден ждать шины, пока не станет доступным место.)
Затем производителю присваивается слот в буфере на основе его callId (который является prducerCallId по модулю bufferSize, но так как bufferSize всегда равен степени 2 (ограничение принудительно устанавливается при создании буфера), используемой операцией во всех действиях является ManufacturerCallId & (bufferSize - 1).)). Затем можно свободно изменять событие в этом слоте.
(Реальный алгоритм немного сложнее, включающий кэширование недавнего потребителя в отдельном атомарном справочнике для целей оптимизации.)
Когда событие было изменено, изменение "опубликовано". При публикации соответствующего слота в массиве флагов заполняется обновленный флаг. Значение флага - это номер цикла (providerCallId, деленный на bufferSize (опять же, поскольку bufferSize имеет степень 2, фактическая операция - сдвиг вправо).
Аналогичным образом может быть любое количество потребителей. Каждый раз, когда потребитель хочет получить доступ к буферу, создается customerCallId (в зависимости от того, как потребители были добавлены в разрушитель, атом, используемый в генерации идентификатора, может быть общим или отдельным для каждого из них). Затем этот customerCallId сравнивается с последним producentCallId, и, если он меньше двух, читателю разрешается прогрессировать.
(Точно так же, если providerCallId равен даже customerCallId, это означает, что буфер пуст, а потребитель вынужден ждать. Способ ожидания определяется WaitStrategy во время создания прерывателя.)
Для отдельных потребителей (тех, у кого есть собственный генератор идентификаторов), следующая проверенная вещь - это возможность пакетного потребления. Слоты в буфере проверяются по порядку, начиная с единицы, соответствующей customerCallId (индекс определяется таким же образом, как и для производителей), до той, которая соответствует недавнему providerCallId.
Они проверяются в цикле путем сравнения значения флага, записанного в массиве флага, со значением флага, созданным для consumerCallId. Если флаги совпадают, это означает, что производители, заполняющие слоты, передали свои изменения. Если нет, цикл прерывается, и возвращается самый высокий зафиксированный changeId. Слоты от ConsumerCallId до полученных в changeId можно использовать в пакетном режиме.
Если группа потребителей читает вместе (те, у кого общий генератор идентификаторов), каждый из них принимает только один callId, и только слот для этого единственного callId проверяется и возвращается.
Из этой статьи:
Схема разрушения представляет собой очередь пакетирования, поддерживаемую кольцевым массивом (т.е. кольцевым буфером), заполненным предварительно выделенными объектами передачи, которые используют барьеры памяти для синхронизации производителей и потребителей через последовательности.
Барьеры памяти трудно объяснить, и блог Триши, на мой взгляд, сделал лучшую попытку в этом посте: http://mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast.html
Но если вы не хотите углубляться в детали низкого уровня, вы можете просто знать, что барьеры памяти в Java реализуются через volatile
ключевое слово или через java.util.concurrent.AtomicLong
, Последовательности паттерна разрушения AtomicLong
s и передаются взад и вперед среди производителей и потребителей через барьеры памяти вместо замков.
Мне проще понять концепцию с помощью кода, поэтому приведенный ниже код представляет собой простой helloworld от CoralQueue, который представляет собой реализацию шаблона разрушителя, выполненную CoralBlocks, с которой я связан. В приведенном ниже коде вы можете увидеть, как шаблон прерывателя реализует пакетирование и как кольцевой буфер (т. Е. Круговой массив) обеспечивает бесперебойную связь между двумя потоками:
package com.coralblocks.coralqueue.sample.queue;
import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;
public class Sample {
public static void main(String[] args) throws InterruptedException {
final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class);
Thread consumer = new Thread() {
@Override
public void run() {
boolean running = true;
while(running) {
long avail;
while((avail = queue.availableToPoll()) == 0); // busy spin
for(int i = 0; i < avail; i++) {
MutableLong ml = queue.poll();
if (ml.get() == -1) {
running = false;
} else {
System.out.println(ml.get());
}
}
queue.donePolling();
}
}
};
consumer.start();
MutableLong ml;
for(int i = 0; i < 10; i++) {
while((ml = queue.nextToDispatch()) == null); // busy spin
ml.set(System.nanoTime());
queue.flush();
}
// send a message to stop consumer...
while((ml = queue.nextToDispatch()) == null); // busy spin
ml.set(-1);
queue.flush();
consumer.join(); // wait for the consumer thread to die...
}
}