График зависимости зависимости от разрушителя LMAX с помощью SequenceBarrier
Цель
Я пытаюсь создать отношения зависимости между обработчиками, которые несколько круглые, и я не могу понять, как сделать это правильно. Чего я хочу добиться, так это вариации producer -> [handlers 1-3] -> handler 4
,
Так, disruptor.handleEventsWith(h1, h2, h3).then(h4);
, Но у меня есть дополнительные требования, которые
- Хотя обработчики 1-3 обрабатывают сообщения параллельно, ни один из них не начинает обрабатывать следующее сообщение, пока все они не закончили предыдущее сообщение.
- После первого сообщения обработчики 1-3 ждут, пока обработчик 4 завершит самое последнее сообщение, прежде чем обрабатывать следующее сообщение.
Эквивалентная логика выполнения с использованием одного обработчика событий может быть:
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
Arrays.asList(h1, h2, h3).parallelStream()
.forEach(h -> h.onEvent(event, sequence, endOfBatch));
h4.onEvent(event, sequence, endOfBatch);
});
контекст
Контекст разработки заключается в том, что каждый обработчик 1-3 обновляет свое собственное состояние в соответствии с сообщением, и после обработки сообщения каждым из трех он находится в согласованном состоянии. Затем обработчик 4 запускает некоторую логику на основе состояния, обновленного обработчиками 1-3. Таким образом, обработчик 4 должен видеть согласованные состояния только для структур данных, поддерживаемых 1-3, что означает, что обработчики 1-3 не должны обрабатывать следующее сообщение, пока обработчик 4 не завершит работу.
(Хотя цель определенно состоит в том, чтобы использовать Disruptor для управления параллелизмом, а не java.util.Stream
.)
Не уверен, что это имеет значение, но это также тот случай, когда логика обработчика 4 может быть разбита на две части, одна из которых требует, чтобы ни один из обработчиков 1-3 не обновлялся, а другая требует только того, чтобы первая часть обработчика 4 завершилась. Таким образом, обработчики 1-3 могут обрабатывать сообщение, пока вторая часть обработчика 4 все еще выполняется.
Есть ли способ сделать это? Или, может быть, мой дизайн имеет недостатки? Я чувствую, что должен быть способ сделать это через SequenceBarrier
но я не совсем понимаю, как реализовать этот пользовательский барьер. Для обработчиков 1-3, я думаю, я хотел бы сделать барьер с логикой handlers[1:3].lastProcessedSequence() == handlers[4].lastProcessedSequence()
, но я не уверен, где поставить эту логику.
Спасибо!
1 ответ
Я бы посчитал, что обработчики не сохраняют состояния и используют обработанные ими сообщения для хранения состояния вашей системы. Таким образом, вам вообще не нужно синхронизировать ваши обработчики.