Источник событий с потоками Кафки

Я пытаюсь реализовать простое подтверждение концепции CQRS/ источников событий поверх потоков Kafka (как описано в https://www.confluent.io/blog/event-sourcing-using-apache-kafka/)

У меня есть 4 основные части:

  1. commands раздел, который использует идентификатор агрегата в качестве ключа для последовательной обработки команд для агрегата
  2. events тема, в которой публикуются все изменения в агрегатном состоянии (опять же, ключ - это идентификатор агрегата). Эта тема имеет политику хранения "никогда не удалять"
  3. KTable, чтобы уменьшить совокупное состояние и сохранить его в хранилище состояний

    поток событий ->
    группировать в Ktable по идентификатору агрегата ->
    уменьшить совокупные события до текущего состояния ->
    материализоваться как государственный магазин
  4. процессор команд - поток команд, оставленный соединенным с агрегатным состоянием KTable. Для каждой записи в результирующем потоке используйте функцию (command, state) => events производить итоговые события и публиковать их в events тема

Вопрос в том, есть ли способ убедиться, что у меня есть последняя версия агрегата в государственном хранилище?

Я хочу отклонить команду, если нарушает бизнес-правила (например, команда для изменения объекта недействительна, если объект был помечен как удаленный). Но если DeleteCommand опубликовано с последующим ModifyCommand сразу после этого команда delete выдаст DeletedEvent, но когда ModifyCommand после обработки загруженное состояние из хранилища состояний может еще не отражать это, а конфликтующие события будут опубликованы.

Я не возражаю жертвовать пропускной способностью обработки команд, я бы предпочел получить гарантии согласованности (поскольку все сгруппированы по одному и тому же ключу и должны оказаться в одном разделе)

Надеюсь, что это было понятно:) Есть предложения?

3 ответа

Я не думаю, что Kafka хорош для CQRS и Event Sourcing, как вы это описали, потому что в нем отсутствует (простой) способ обеспечения защиты от одновременных записей. Эта статья рассказывает об этом в деталях.

То, что я имею в виду под тем, как вы описали, это тот факт, что вы ожидаете, что команда сгенерирует ноль или более событий или потерпит неудачу с исключением; это классический CQRS с источником событий. Большинство людей ожидают такой архитектуры.

Вы могли бы иметь источник событий, однако, в другом стиле. Ваши обработчики команд могут выдавать события для каждой полученной команды (т.е. DeleteWasAccepted). Затем обработчик события может в конечном итоге обработать это событие источником события (путем восстановления состояния агрегата из его потока событий) и генерировать другие события (т. Е. ItemDeleted или же ItemDeletionWasRejected). Итак, команды запускаются и забываются, отправляются асинхронно, клиент не ждет немедленного ответа. Однако он ожидает события, описывающего результат выполнения его команды.

Важным аспектом является то, что обработчик событий должен обрабатывать события из одного и того же агрегата последовательно (ровно один раз и по порядку). Это может быть реализовано с использованием единой группы потребителей Kafka. Вы можете увидеть об этой архитектуре в этом видео.

Пожалуйста, прочитайте эту статью моего коллеги Джеспера. Кафка - отличный продукт, но на самом деле не совсем подходит для организации мероприятий

https://medium.com/serialized-io/apache-kafka-is-not-for-event-sourcing-81735c3cf5c

Возможное решение, которое я придумал, заключается в реализации своего рода оптимистического механизма блокировки:

  1. Добавить expectedVersion поле по командам
  2. Используйте KTable Aggregator увеличить версию составного снимка для каждого обработанного события
  3. Отклонить команды, если expectedVersion не соответствует агрегатной версии снимка

Это, кажется, обеспечивает семантику, которую я ищу

Другие вопросы по тегам