LMAX Disruptor: должен клонировать объект EventHandler, полученный от EventHandler#onEvent

У меня есть приложение со многими производителями и потребителями.

Насколько я понимаю, RingBuffer создает объекты в начале инициализации RingBuffer, а затем вы копируете объект при публикации в Ring и получаете их из него в EventHandler.

Мое приложение LogHandler буферизирует полученные события в списке для дальнейшей отправки в пакетном режиме, когда список достигает определенного размера. Таким образом, EventHandler # onEvent помещает полученный объект в список, как только он достигает размера, он отправляет его в RMI на сервер и очищает его.

Мой вопрос: нужно ли мне клонировать объект, прежде чем я добавлю его в список, как я понимаю, после использования они могут быть повторно использованы?

Нужно ли синхронизировать доступ к списку в моем EventHandler#onEvent?

3 ответа

Решение

Да, ваше понимание верно. Вы копируете свои значения в слоты кольцевого буфера и из них.

Я бы сказал, что да, вы клонируете значения, когда извлекаете их из кольцевого буфера в свой список обработчиков событий; в противном случае слот можно использовать повторно.

Вам не нужно синхронизировать доступ к списку, если это частная переменная-член вашего обработчика событий и у вас есть только один экземпляр обработчика событий на поток. Если у вас есть несколько обработчиков событий, добавляющих один и тот же (например, статический) экземпляр List, вам потребуется синхронизация.

Можно использовать слоты в RingBuffer Disruptor (включая те, которые содержат List) без клонирования / копирования значений. Это может быть предпочтительным решением для вас, в зависимости от того, беспокоитесь ли вы о создании мусора, и действительно ли вам нужно беспокоиться о одновременных обновлениях объектов, помещаемых в RingBuffer. Если все объекты, помещаемые в список слотов, являются неизменяемыми или если они обновляются / читаются только одним потоком за раз (предварительное условие, которое Disruptor часто используется для принудительного применения), от клонирования ничего не получится поскольку они уже невосприимчивы к гонкам данных.

Что касается пакетирования, обратите внимание, что сама структура Disruptor предоставляет механизм для получения элементов из RingBuffer в пакетном режиме в потоках EventHandler. Этот подход полностью поточнобезопасен и не блокируется, и может повысить производительность, сделав ваши шаблоны доступа к памяти более предсказуемыми для ЦП.

Нет, вы не правы.

Простой ответ:

  • вам не нужно ничего клонировать
  • вам не нужно ничего синхронизировать

Концепция разрушителя LMAX заключается в том, что у вас есть заранее выделенный список событий . Когда издатель хочет опубликовать, он запросит следующий «БЕСПЛАТНЫЙ» экземпляр из кольцевого буфера, т.е.:

              final long sequenceId = ringBuffer.next();
        final SomeEvent serviceEvent = ringBuffer.get(sequenceId);

Тогда вам нужно передать данному экземпляру ваш запрос. Таким образом, вместо создания нового экземпляра запроса/события вам (против подхода ООП) вам нужно будет использовать существующий экземпляр и настроить его в соответствии со своими потребностями. Например:

              serviceEvent.requestCommandCreateNewUser("banán", "jahodový");

Вы должны проявлять осторожность при реализации данного метода, чтобы не создавать новые экземпляры (чтобы соответствовать философии разрушителя). Например, вы просто скопируете параметры в заранее выделенные частные поля.

Затем вы помечаете свой экземпляр как «готовый к обработке» , используя вызов публикации с идентификатором последовательности:

              ringBuffer.publish(sequenceId);

После этого вы не должны получать доступserviceEventэкземпляр больше. По крайней мере, не как издатель...

С другой стороны , вы получите данный экземпляр для обработки с помощью обработчика . Следующий код строит часть:

              final Disruptor<SomeEvent> disruptor
                = new Disruptor<>(
                SomeEvent.EVENT_FACTORY,
                150, // we would not like to hit pathological mapping ( https://en.algorithmica.org/hpc/cpu-cache/associativity/#pathological-mappings )
                threadFactory,
                ProducerType.SINGLE,
                waitStrategy);
        disruptor.handleEventsWith(new BatchingEventHandler());
        ringBuffer = disruptor.start();

Это всего лишь установка. Фактическая обработка событий находится в . Также может быть пакетирование. Умное пакетирование, возможно, является лучшим подходом.

Smartbatching просто берет то, что подготовлено в кольцевом буфере, и создает партию. Не имеет значения, обрабатываются ли одно событие или 10 событий в пакетном режиме. Тем временем входящие запросы помещаются в кольцевой буфер. Когда реальный пакет будет завершен, алгоритм снова возьмет все, что подготовлено в кольцевом буфере, и обработает все события в пакетном режиме...

Посмотри наBatchingEventHandlerсорт:

      import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.Sequence;

import java.util.ArrayList;

public class BatchingEventHandler implements EventHandler<UnionedServiceEvent> {
    /**
     * Maximum events to process at once. This is upper cap for smart batch size.
     */
    public static final int MAXIMUM_BATCH_SIZE = 20;
    private Sequence sequenceCallback;
    private final ArrayList<UnionedServiceEvent> internalBatch = new ArrayList(25);
    //private int currentBatchRemaining = 20;

    @Override
    public void setSequenceCallback(final Sequence sequenceCallback) {
        this.sequenceCallback = sequenceCallback;
    }

    @Override
    public void onEvent(final UnionedServiceEvent event, final long sequence, final boolean endOfBatch) {
        this.internalBatch.add(event);

        boolean logicalChunkOfWorkComplete = isLogicalChunkOfWorkComplete(endOfBatch);
        if (logicalChunkOfWorkComplete) {
            // mark given instances as "free". From now, we can not use any older UnionedServiceEvent instances!
            // We have forgotten them in isLogicalChunkOfWorkComplete method
            sequenceCallback.set(sequence);
        }
    }

    private boolean isLogicalChunkOfWorkComplete(boolean forceEnd) {
        // Consider, what is big batch and do test for large batch when I/O is involved
        boolean internalBatchMaximumSizeReached = this.internalBatch.size() >= MAXIMUM_BATCH_SIZE;
        if(internalBatchMaximumSizeReached || forceEnd) {
            // DO actual I/O operation with all instances.
            System.out.println("Processed %s events in single batch.".formatted(this.internalBatch.size()));
            this.internalBatch.clear();
        }
        return internalBatchMaximumSizeReached;
    }
}

Для получения минимальной информации вы можете посмотреть, например, https://www.baeldung.com/lmax-disruptor-concurrency .

Для получения документации уделите время и прочтите https://lmax-exchange.github.io/disruptor/user-guide/index.html . По крайней мере, часть «Работа с большими пакетами» перед концом документа.

Другие вопросы по тегам