Источник событий / модель чтения CQRS - прогнозы
У меня есть приложение на основе микросервиса, работающее на AWS Lambda. Два из самых важных микросервисов используют event-sourcing/cqrs.
Фон: (это также для меня, чтобы организовать свои мысли)
Я использую эту библиотеку и храню события в DynamoDB и прогнозы в AWS S3.
Часть записи работает как чудо: каждый вызов команды загружает текущее состояние агрегата из DynamoDB (путем запуска событий через обработчик и / или загрузки кэшированного агрегата), он решает принять или отклонить команду на основе некоторой бизнес-логики, затем пишет в DynamoDB с KeyConditionExpression: 'aggregateId = :a AND version >= :v'
где версия - это число событий, обработанных для этого агрегата. Если есть конфликт, запись не удалась. Похоже, хорошая система для меня!
Каждое событие затем передается в SNS (название темы - это имя службы), чтобы другие службы могли реагировать на событие, если они этого хотят.
Часть, с которой я действительно борюсь, - это чтение. Проекции хранятся в S3 и помечаются последним коммитом, обработанным для каждого источника события. Когда приходит запрос на чтение, он загружает все спроецированное состояние из S3 (для всех агрегатов), запрашивает источники событий для всех более новых событий, вычисляет последнее состояние (снова для всех агрегатов) и записывает обновленный объект в S3, если это новее) и возвращает соответствующие части состояния на основе параметров запроса.
Моя проблема: (или одна из них)
Я думаю, что я делаю прогнозы неправильно.
В большинстве моих проекций идентификаторы группируются только по важным атрибутам, поэтому файлы остаются относительно небольшими. Но мне также нужен способ извлечения отдельного агрегата. Использование прогнозов для этого кажется сумасшедшим, потому что мне нужно каждый раз загружать все состояние (т. Е. Каждый прогнозируемый агрегат), применять к нему новые события, а затем извлекать нужную мне запись (она, возможно, даже не изменилась).
Это то, что я делаю сейчас, это хорошо работает (<100 тыс. Записей), но я не могу себе представить, что это будет продолжаться намного дольше.
Другая проблема - это запросы. Мне нужно построить значение отображения проекции для соответствия aggregateIds для каждого атрибута, по которому мне нужно запросить!! Должен быть лучший способ!
Независимо от того, каким образом я думаю об этой проблеме, проекциям всегда нужно все текущее состояние + любые новые события, прежде чем он сможет вернуть хотя бы одну запись, которая не изменилась.
2 ответа
Я думаю, что я делаю прогнозы неправильно.
Я тоже так думаю; Похоже, ваши запросы связаны с вашими прогнозами
Когда приходит запрос на чтение, он загружает все спроецированное состояние из S3 (для всех агрегатов), запрашивает источники событий для всех более новых событий, вычисляет последнее состояние
Да, это звучит как беспорядок. Или, более конкретно, это звучит так, как будто запрос запускает работу, выполняемую проекцией.
Если вы можете отделить запросы от прогнозов, то все станет проще. Основная идея в том, что ваши запросы не описывают текущее состояние, они описывают состояние на момент последнего запуска проекции.
Та же идея, другое написание: вы отвечаете на запросы из документов, которые вы кэшируете в S3. При обнаружении новых событий ваши прогнозы запускаются, загружают новые данные по мере необходимости, вычисляют новый документ и заменяют записи в кэше.
Я думаю о треугольнике
- Команды приносят информацию извне в книгу рекордов
- Прогнозы переносят информацию из книги рекордов в кеш
- Запросы приносят информацию из кеша во внешний мир
где каждая нога треугольника проходит асинхронно с другими.
Я предлагаю вам работать в обратном направлении от запросов - какие документы вам нужны для поддержки каждого запроса? Какие цели латентности вы должны преодолеть? Затем вы начинаете балансировать компромиссы - для этого нового запроса я создаю результат из существующих документов или мне нужен новый документ, построенный с более мелким зерном?
если я правильно понимаю, я должен запускать обновления проекции по мере поступления событий, а не в совокупности при выполнении запроса. Это избавляет меня от запросов к хранилищу событий для новых событий на каждый запрос
Да, и... события - только один из способов запуска; Вы также можете запускать процессы проецирования по часам (проверяйте каждые 15 минут, чтобы узнать, нужно ли нам обновляться) или по прихоти человека-оператора (хм, похоже, баланс вашего счета устарел, позвольте мне попытаться обновить это для тебя). Есть несколько способов сделать это, и вы можете смешивать и сочетать стратегии.
Мне по-прежнему нужно загружать все состояние, как при обновлении проекции, так и при загрузке одного агрегата.
Не обязательно. Нет правила, согласно которому нельзя использовать ранее кэшированное представление в качестве отправной точки, а затем извлекать из книги записей только те изменения, которые вам необходимы.
Например, предположим, что вы создаете представление, которое объединяет агрегаты A{id:7}
а также B{id:9}
, Вы берете кэшированную копию и смотрите ее метаданные (куда вы помещаете ее в предыдущую запись) и находите внутри нее что-то вроде metadata:{A:{id:7, version:21}, B:{id:9, version:19}}
, Теперь вам нужно только загрузить события после тех, которые вы использовали в прошлый раз, обновить локальную копию в памяти, обновить локальную копию метаданных и отправить лот в кэш.
Я не знаком с вашей технической инфраструктурой, но способ реализации прогнозов заключается в следующем:
Каждое доменное событие имеет глобальный порядковый номер, который охватывает все совокупные корни. Проекция - это модель чтения с произвольным именем и последней обработанной позицией, представленной этим глобальным порядковым номером. Я могу добавить новую проекцию в любое время вместе с ее обработчиками событий, и она начнется с позиции 0. Я могу очистить проекцию в любое время и вернуть позицию на 0. Я также могу использовать комбинацию добавления новой Проекция, которая заменит существующую, построит эту сборку, даже если она займет несколько дней, а затем удалит старую.
Существует служба, которая отслеживает проекции и использует хранилище событий почти как очередь. Служба проекции проверяет события с глобальными идентификаторами после текущей позиции и передает их обработчикам, а затем обновляет позицию. Здесь ваша проекция может даже фильтровать типы событий для повышения производительности.
Это основная идея. Ваши прогнозы - это то, что вы запрашиваете. Как только проекция достигнет "головы" хранилища событий, события из хранилища событий будут поступать в проекцию.
Как это отразится на вашем техническом пространстве, я не совсем уверен. У меня есть небольшой эксперимент под названием Shuttle.Позвоните на C#, если вы хотите взглянуть, чтобы получить некоторые идеи.