График зависимости зависимости от разрушителя LMAX с помощью SequenceBarrier

Цель

Я пытаюсь создать отношения зависимости между обработчиками, которые несколько круглые, и я не могу понять, как сделать это правильно. Чего я хочу добиться, так это вариации producer -> [handlers 1-3] -> handler 4,

Так, disruptor.handleEventsWith(h1, h2, h3).then(h4);, Но у меня есть дополнительные требования, которые

  1. Хотя обработчики 1-3 обрабатывают сообщения параллельно, ни один из них не начинает обрабатывать следующее сообщение, пока все они не закончили предыдущее сообщение.
  2. После первого сообщения обработчики 1-3 ждут, пока обработчик 4 завершит самое последнее сообщение, прежде чем обрабатывать следующее сообщение.

Эквивалентная логика выполнения с использованием одного обработчика событий может быть:

disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
  Arrays.asList(h1, h2, h3).parallelStream()
        .forEach(h -> h.onEvent(event, sequence, endOfBatch));
  h4.onEvent(event, sequence, endOfBatch);
});

контекст

Контекст разработки заключается в том, что каждый обработчик 1-3 обновляет свое собственное состояние в соответствии с сообщением, и после обработки сообщения каждым из трех он находится в согласованном состоянии. Затем обработчик 4 запускает некоторую логику на основе состояния, обновленного обработчиками 1-3. Таким образом, обработчик 4 должен видеть согласованные состояния только для структур данных, поддерживаемых 1-3, что означает, что обработчики 1-3 не должны обрабатывать следующее сообщение, пока обработчик 4 не завершит работу.

(Хотя цель определенно состоит в том, чтобы использовать Disruptor для управления параллелизмом, а не java.util.Stream.)

Не уверен, что это имеет значение, но это также тот случай, когда логика обработчика 4 может быть разбита на две части, одна из которых требует, чтобы ни один из обработчиков 1-3 не обновлялся, а другая требует только того, чтобы первая часть обработчика 4 завершилась. Таким образом, обработчики 1-3 могут обрабатывать сообщение, пока вторая часть обработчика 4 все еще выполняется.

Есть ли способ сделать это? Или, может быть, мой дизайн имеет недостатки? Я чувствую, что должен быть способ сделать это через SequenceBarrier но я не совсем понимаю, как реализовать этот пользовательский барьер. Для обработчиков 1-3, я думаю, я хотел бы сделать барьер с логикой handlers[1:3].lastProcessedSequence() == handlers[4].lastProcessedSequence(), но я не уверен, где поставить эту логику.

Спасибо!

1 ответ

Я бы посчитал, что обработчики не сохраняют состояния и используют обработанные ими сообщения для хранения состояния вашей системы. Таким образом, вам вообще не нужно синхронизировать ваши обработчики.

Другие вопросы по тегам