Параллельность с DDD + CRQS + ES
Некоторое время я изучал DDD и наткнулся на шаблоны проектирования, такие как CQRS и Event Sourcing (ES). Эти шаблоны могут быть использованы для достижения некоторых концепций DDD с меньшими усилиями.
Затем я начал разрабатывать простое программное обеспечение для реализации всех этих концепций. И начал представлять себе возможные пути отказа.
Просто чтобы прояснить мою архитектуру, на следующем рисунке описан один запрос, поступающий от внешнего интерфейса и достигающий контроллера. Я - серверный (для простоты я проигнорировал все фильтры, связующие).
- Актер отправляет форму с суммой денег, которую он хочет снять с одного счета.
- контроллер передает модель представления на прикладной уровень, где она будет преобразована в одну команду
- прикладной уровень открывает одну единицу работы (UOW), сопоставляет виртуальную машину с командой и отправляет команду диспетчеру.
- диспетчер находит соответствующий агрегатный класс, который знает, как обрабатывать команду (учетную запись), и запрашивает фабрику для конкретного экземпляра учетной записи.
- фабрика создает новый экземпляр учетной записи и запрашивает все события из хранилища событий.
- хранилище событий возвращает все события учетной записи.
- фабрика отправляет все события в агрегат, чтобы обеспечить правильное внутреннее состояние. И вернуть экземпляр аккаунта.
- диспетчер отправляет команду учетной записи, чтобы ее можно было обработать.
- проверка счета, если у него достаточно денег, чтобы сделать вывод. И если это так, он отправляет одно новое событие "MoneyWithdrawnEvent".
- это событие обрабатывается агрегатом (учетной записью), изменяющим его внутреннее состояние.
- прикладной уровень закрывает UOW, и когда это происходит, UOW проверяет все загруженные агрегаты, чтобы проверить, есть ли у них новые события для сохранения в хранилище событий. Если они есть, он отправляет события в хранилище.
- хранилище сохраняет события в хранилище событий.
Существует много слоев, которые можно добавить, например: кэширование агрегатов, кэширование событий, снимки и т. Д.
Иногда ES можно использовать параллельно с реляционной базой данных. Таким образом, когда UOW сохраняет новые события, которые произошли, он также сохраняет агрегаты в реляционной базе данных.
Одним из преимуществ ES является то, что у нее есть один центральный источник правды, хранилище событий. Таким образом, даже если модели в памяти или даже в реляционной базе данных повреждены, мы можем перестроить модель по событиям.
И имея этот источник правды, мы можем создавать другие системы, которые могут по-разному воспринимать события, чтобы сформировать другую модель.
Однако, чтобы это работало, нам нужно, чтобы источник истины был чистым, а не искаженным. В противном случае все эти преимущества не будут существовать.
Тем не менее, если мы рассмотрим параллелизм в архитектуре, описанной в образе, могут возникнуть некоторые проблемы:
- если актер дважды отправляет форму бэкэнду за период сортировки, а бэкэнд запускает два потока (по одному для каждого запроса), тогда они будут два раза вызывать прикладной уровень и запускать два UOW и т. д. Это может привести к тому, что два события будут сохранены в хранилище событий.
Эта проблема может быть решена во многих местах:
Внешний интерфейс может контролировать, какой пользователь / субъект может выполнять какие действия и сколько раз.
Диспетчер может иметь один кэш всех команд, которые обрабатываются, и, если есть команда, которая ссылается на тот же агрегат (учетную запись), он генерирует исключение.
Репозиторий может создать новый экземпляр агрегата и запустить все события из хранилища событий непосредственно перед сохранением, чтобы проверить, совпадает ли версия с версией, выбранной на шаге 7.
Проблемы с каждым решением:
Внешний интерфейс
- Пользователь может обойти это ограничение, отредактировав некоторый JavaScript.
- Если открыто несколько сеансов (например, в разных браузерах), необходимо статическое поле, содержащее ссылку на все открытые сеансы. и было бы необходимо заблокировать некоторую статическую переменную для доступа к этому полю.
- Если для определенного выполняемого действия (горизонтальное масштабирование) существует несколько серверов, это статическое поле не будет работать, поскольку необходимо будет использовать его для всех серверов. Таким образом, некоторый слой будет необходим (например, Redis).
Кэш команд
Чтобы это решение работало, необходимо было бы заблокировать некоторую статическую переменную кеша команд при чтении и записи в него.
Если для конкретного варианта использования выполняемого прикладного уровня существует несколько Серверов (горизонтальное масштабирование), этот статический кэш не будет работать, потому что это необходимо для совместного использования на всех серверах. Таким образом, некоторый слой будет необходим (например, Redis).
Проверка версии репозитория
Чтобы это решение работало, необходимо будет заблокировать некоторую статическую переменную перед выполнением проверки (версия базы данных равна версии, выбранной на шаге 7) и сохранением.
Если бы система была распределена (горизонтальный масштаб), необходимо было бы заблокировать хранилище событий. Потому что, в противном случае, оба процесса могут пройти проверку (версия базы данных равна версии, выбранной на шаге 7), и затем один сохраняет, а затем другой сохраняет. И в зависимости от технологии невозможно заблокировать хранилище событий. Таким образом, будет еще один уровень для сериализации каждого доступа к хранилищу событий и добавления возможности заблокировать хранилище.
Эти решения, которые блокируют статическую переменную, в некоторой степени хороши, потому что они являются локальными переменными и очень быстры Тем не менее, в зависимости от чего-то вроде Redis добавляет несколько больших задержек. И даже больше, если говорить о блокировке доступа к базам данных (хранилище событий). И даже больше, если это нужно сделать через один другой сервис.
Я хотел бы знать, есть ли какое-либо другое возможное решение для решения этой проблемы, потому что это серьезная проблема (повреждение хранилища событий), и если нет пути ее обхода, вся концепция кажется ошибочной.
Я открыт для любых изменений в архитектуре. Если, например, одним из решений является добавление одной шины событий, чтобы все проходило через нее, это нормально, но я не вижу, чтобы это решило проблему.
Другой момент, с которым я не знаком, это Кафка. Я не знаю, есть ли какое-то решение, которое Кафка предлагает для этой проблемы.
2 ответа
Хотя все предоставленные вами решения могут работать в некоторых конкретных сценариях, я думаю, что последнее решение (3.2) подходит для более общего случая использования. Я использую его в моей среде с открытым исходным кодом, и она работает очень хорошо.
Таким образом, хранилище событий отвечает за то, чтобы агрегат не мутировал одновременно двумя командами.
Один из способов сделать это - использовать оптимистическую блокировку. Когда Агрегат загружается из хранилища событий, вы помните его version
, Когда вы сохраняете события, вы пытаетесь добавить их с version + 1
, Вы должны иметь уникальный индекс на AggregateType-AggregateId-version
, Если добавление не удается, вы должны повторить весь процесс (load + handle + append).
Я думаю, что это наиболее масштабируемое решение, так как оно работает даже с разделением, когда ключ разделения является подмножеством AggregateId.
Вы можете легко использовать MongoDB в качестве EventStore. В MongoDB <= 3.6 вы можете добавлять все события атомарно, вставляя один документ с вложенным документом, содержащим массив событий.
Другое решение заключается в использовании пессимистической блокировки. Вы запускаете транзакцию перед загрузкой Агрегата, добавляете события, увеличиваете его версию и фиксируете. Вам необходимо использовать 2 таблицы / коллекции, одну для совокупных метаданных + версия и одну для реальных событий. MongoDB >= 4.0 имеет транзакции.
В обоих этих решениях хранилище событий не повреждено.
Другой момент, с которым я не знаком, это Кафка. Я не знаю, есть ли какое-то решение, которое Кафка предлагает для этой проблемы.
Вы можете использовать Kafka с источником событий, но вам нужно изменить свою архитектуру. Смотрите этот ответ.
Короткий ответ: атомарные транзакции все еще вещь.
Более длинный ответ: для правильной обработки одновременных записей вам нужна либо блокировка, либо условные записи (например, сравнение и своп).
Используя журнал: нам нужно получить блокировку до шага 6 и снять блокировку после шага 12.
Используя условную запись: на шаге 6 хранилище будет захватывать предикат параллелизма (который может быть неявным - например, количество прочитанных событий). При выполнении записи на шаге 12 предикат параллелизма будет проверяться, чтобы убедиться, что не было одновременных изменений.
Например, HTTP API для Event Store использует ES-ExpectedVersion; клиент отвечает за вычисления (от событий, которые он получил), где он ожидает, что запись произойдет.
Габриэль Шенкер описывает как хранилище RDBMS, так и хранилище хранилища событий в своей статье " Подбор источников событий для эссе 2015" - хранилище.
Конечно, с введением условной записи вы должны подумать о том, что вы хотите, чтобы модель делала в случае сбоя записи. Вы можете ввести стратегию повтора (перейти к шагу 6), или попробовать стратегию слияния, или просто потерпеть неудачу и вернуться к отправителю.
В вашем примере условной записи я предполагаю, что на шаге 11 необходимо будет добавить блокировку (чтобы она блокировала хранилище событий для извлечения предиката параллелизма). И снимите блокировку только после записи новых событий в хранилище событий. В противном случае два параллельных процесса могут пройти проверку предиката параллелизма и сохранить события.
Не обязательно.
Если в вашем хранилище постоянных данных предусмотрены блокировки, а не условные записи, у вас есть правильная идея: на шаге 12 хранилище получит блокировку, проверит предварительное условие, зафиксирует новые события и снимет блокировку.
Но устройство персистентности, которое понимает условные записи, может реализовать эту проверку для вас. Используя хранилище событий, хранилище не должно получать блокировку. Он отправляет события с метаданными об ожидаемом состоянии в магазин. Само хранилище событий использует эту информацию для выполнения условной записи.
В этом нет никакой магии - кто-то должен сделать работу, чтобы гарантировать, что одновременные записи не заглушают друг друга. Но это не обязательно должно быть в вашем коде.
Обратите внимание, что я использую "Репозиторий", как описано Эриком Эвансом в синей книге - это абстракция, которая скрывает ваш выбор того, как хранить события из остальной части системы; другими словами, это адаптер, который делает ваше хранилище событий похожим на собрание событий в памяти - это не само хранилище событий.