LMAX Disruptor: должен клонировать объект EventHandler, полученный от EventHandler#onEvent
У меня есть приложение со многими производителями и потребителями.
Насколько я понимаю, RingBuffer создает объекты в начале инициализации RingBuffer, а затем вы копируете объект при публикации в Ring и получаете их из него в EventHandler.
Мое приложение LogHandler буферизирует полученные события в списке для дальнейшей отправки в пакетном режиме, когда список достигает определенного размера. Таким образом, EventHandler # onEvent помещает полученный объект в список, как только он достигает размера, он отправляет его в RMI на сервер и очищает его.
Мой вопрос: нужно ли мне клонировать объект, прежде чем я добавлю его в список, как я понимаю, после использования они могут быть повторно использованы?
Нужно ли синхронизировать доступ к списку в моем EventHandler#onEvent?
3 ответа
Да, ваше понимание верно. Вы копируете свои значения в слоты кольцевого буфера и из них.
Я бы сказал, что да, вы клонируете значения, когда извлекаете их из кольцевого буфера в свой список обработчиков событий; в противном случае слот можно использовать повторно.
Вам не нужно синхронизировать доступ к списку, если это частная переменная-член вашего обработчика событий и у вас есть только один экземпляр обработчика событий на поток. Если у вас есть несколько обработчиков событий, добавляющих один и тот же (например, статический) экземпляр List, вам потребуется синхронизация.
Можно использовать слоты в RingBuffer Disruptor (включая те, которые содержат List
) без клонирования / копирования значений. Это может быть предпочтительным решением для вас, в зависимости от того, беспокоитесь ли вы о создании мусора, и действительно ли вам нужно беспокоиться о одновременных обновлениях объектов, помещаемых в RingBuffer. Если все объекты, помещаемые в список слотов, являются неизменяемыми или если они обновляются / читаются только одним потоком за раз (предварительное условие, которое Disruptor часто используется для принудительного применения), от клонирования ничего не получится поскольку они уже невосприимчивы к гонкам данных.
Что касается пакетирования, обратите внимание, что сама структура Disruptor предоставляет механизм для получения элементов из RingBuffer в пакетном режиме в потоках EventHandler. Этот подход полностью поточнобезопасен и не блокируется, и может повысить производительность, сделав ваши шаблоны доступа к памяти более предсказуемыми для ЦП.
Нет, вы не правы.
Простой ответ:
- вам не нужно ничего клонировать
- вам не нужно ничего синхронизировать
Концепция разрушителя LMAX заключается в том, что у вас есть заранее выделенный список событий . Когда издатель хочет опубликовать, он запросит следующий «БЕСПЛАТНЫЙ» экземпляр из кольцевого буфера, т.е.:
final long sequenceId = ringBuffer.next();
final SomeEvent serviceEvent = ringBuffer.get(sequenceId);
Тогда вам нужно передать данному экземпляру ваш запрос. Таким образом, вместо создания нового экземпляра запроса/события вам (против подхода ООП) вам нужно будет использовать существующий экземпляр и настроить его в соответствии со своими потребностями. Например:
serviceEvent.requestCommandCreateNewUser("banán", "jahodový");
Вы должны проявлять осторожность при реализации данного метода, чтобы не создавать новые экземпляры (чтобы соответствовать философии разрушителя). Например, вы просто скопируете параметры в заранее выделенные частные поля.
Затем вы помечаете свой экземпляр как «готовый к обработке» , используя вызов публикации с идентификатором последовательности:
ringBuffer.publish(sequenceId);
После этого вы не должны получать доступserviceEvent
экземпляр больше. По крайней мере, не как издатель...
С другой стороны , вы получите данный экземпляр для обработки с помощью обработчика . Следующий код строит часть:
final Disruptor<SomeEvent> disruptor
= new Disruptor<>(
SomeEvent.EVENT_FACTORY,
150, // we would not like to hit pathological mapping ( https://en.algorithmica.org/hpc/cpu-cache/associativity/#pathological-mappings )
threadFactory,
ProducerType.SINGLE,
waitStrategy);
disruptor.handleEventsWith(new BatchingEventHandler());
ringBuffer = disruptor.start();
Это всего лишь установка. Фактическая обработка событий находится в . Также может быть пакетирование. Умное пакетирование, возможно, является лучшим подходом.
Smartbatching просто берет то, что подготовлено в кольцевом буфере, и создает партию. Не имеет значения, обрабатываются ли одно событие или 10 событий в пакетном режиме. Тем временем входящие запросы помещаются в кольцевой буфер. Когда реальный пакет будет завершен, алгоритм снова возьмет все, что подготовлено в кольцевом буфере, и обработает все события в пакетном режиме...
Посмотри наBatchingEventHandler
сорт:
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.Sequence;
import java.util.ArrayList;
public class BatchingEventHandler implements EventHandler<UnionedServiceEvent> {
/**
* Maximum events to process at once. This is upper cap for smart batch size.
*/
public static final int MAXIMUM_BATCH_SIZE = 20;
private Sequence sequenceCallback;
private final ArrayList<UnionedServiceEvent> internalBatch = new ArrayList(25);
//private int currentBatchRemaining = 20;
@Override
public void setSequenceCallback(final Sequence sequenceCallback) {
this.sequenceCallback = sequenceCallback;
}
@Override
public void onEvent(final UnionedServiceEvent event, final long sequence, final boolean endOfBatch) {
this.internalBatch.add(event);
boolean logicalChunkOfWorkComplete = isLogicalChunkOfWorkComplete(endOfBatch);
if (logicalChunkOfWorkComplete) {
// mark given instances as "free". From now, we can not use any older UnionedServiceEvent instances!
// We have forgotten them in isLogicalChunkOfWorkComplete method
sequenceCallback.set(sequence);
}
}
private boolean isLogicalChunkOfWorkComplete(boolean forceEnd) {
// Consider, what is big batch and do test for large batch when I/O is involved
boolean internalBatchMaximumSizeReached = this.internalBatch.size() >= MAXIMUM_BATCH_SIZE;
if(internalBatchMaximumSizeReached || forceEnd) {
// DO actual I/O operation with all instances.
System.out.println("Processed %s events in single batch.".formatted(this.internalBatch.size()));
this.internalBatch.clear();
}
return internalBatchMaximumSizeReached;
}
}
Для получения минимальной информации вы можете посмотреть, например, https://www.baeldung.com/lmax-disruptor-concurrency .
Для получения документации уделите время и прочтите https://lmax-exchange.github.io/disruptor/user-guide/index.html . По крайней мере, часть «Работа с большими пакетами» перед концом документа.