Источник событий с потоками Кафки
Я пытаюсь реализовать простое подтверждение концепции CQRS/ источников событий поверх потоков Kafka (как описано в https://www.confluent.io/blog/event-sourcing-using-apache-kafka/)
У меня есть 4 основные части:
commands
раздел, который использует идентификатор агрегата в качестве ключа для последовательной обработки команд для агрегатаevents
тема, в которой публикуются все изменения в агрегатном состоянии (опять же, ключ - это идентификатор агрегата). Эта тема имеет политику хранения "никогда не удалять"KTable, чтобы уменьшить совокупное состояние и сохранить его в хранилище состояний
поток событий -> группировать в Ktable по идентификатору агрегата -> уменьшить совокупные события до текущего состояния -> материализоваться как государственный магазин
процессор команд - поток команд, оставленный соединенным с агрегатным состоянием KTable. Для каждой записи в результирующем потоке используйте функцию
(command, state) => events
производить итоговые события и публиковать их вevents
тема
Вопрос в том, есть ли способ убедиться, что у меня есть последняя версия агрегата в государственном хранилище?
Я хочу отклонить команду, если нарушает бизнес-правила (например, команда для изменения объекта недействительна, если объект был помечен как удаленный). Но если DeleteCommand
опубликовано с последующим ModifyCommand
сразу после этого команда delete выдаст DeletedEvent
, но когда ModifyCommand
после обработки загруженное состояние из хранилища состояний может еще не отражать это, а конфликтующие события будут опубликованы.
Я не возражаю жертвовать пропускной способностью обработки команд, я бы предпочел получить гарантии согласованности (поскольку все сгруппированы по одному и тому же ключу и должны оказаться в одном разделе)
Надеюсь, что это было понятно:) Есть предложения?
3 ответа
Я не думаю, что Kafka хорош для CQRS и Event Sourcing, как вы это описали, потому что в нем отсутствует (простой) способ обеспечения защиты от одновременных записей. Эта статья рассказывает об этом в деталях.
То, что я имею в виду под тем, как вы описали, это тот факт, что вы ожидаете, что команда сгенерирует ноль или более событий или потерпит неудачу с исключением; это классический CQRS с источником событий. Большинство людей ожидают такой архитектуры.
Вы могли бы иметь источник событий, однако, в другом стиле. Ваши обработчики команд могут выдавать события для каждой полученной команды (т.е. DeleteWasAccepted
). Затем обработчик события может в конечном итоге обработать это событие источником события (путем восстановления состояния агрегата из его потока событий) и генерировать другие события (т. Е. ItemDeleted
или же ItemDeletionWasRejected
). Итак, команды запускаются и забываются, отправляются асинхронно, клиент не ждет немедленного ответа. Однако он ожидает события, описывающего результат выполнения его команды.
Важным аспектом является то, что обработчик событий должен обрабатывать события из одного и того же агрегата последовательно (ровно один раз и по порядку). Это может быть реализовано с использованием единой группы потребителей Kafka. Вы можете увидеть об этой архитектуре в этом видео.
Пожалуйста, прочитайте эту статью моего коллеги Джеспера. Кафка - отличный продукт, но на самом деле не совсем подходит для организации мероприятий
https://medium.com/serialized-io/apache-kafka-is-not-for-event-sourcing-81735c3cf5c
Возможное решение, которое я придумал, заключается в реализации своего рода оптимистического механизма блокировки:
- Добавить
expectedVersion
поле по командам - Используйте KTable
Aggregator
увеличить версию составного снимка для каждого обработанного события - Отклонить команды, если
expectedVersion
не соответствует агрегатной версии снимка
Это, кажется, обеспечивает семантику, которую я ищу