Использование Kafka как хранилища событий (CQRS). Отличная идея?

Хотя я встречал Кафку раньше, я только недавно понял, что Кафку, возможно, можно использовать как (основу) CQRS, хранилище событий.

Один из основных моментов, которые поддерживает Кафка:

  • Захват / хранение событий, все ГА, конечно.
  • Паб / суб архитектура
  • Возможность воспроизведения журнала событий, что позволяет новым подписчикам регистрироваться в системе после факта.

По общему признанию, я не на 100% разбираюсь в CQRS / Event Sourcing, но это кажется довольно близко к тому, каким должно быть хранилище событий. Забавно то, что я действительно не могу найти так много о том, что Кафка используется как хранилище событий, так что, возможно, я что-то упускаю.

Итак, чего не хватает в Кафке, чтобы он был хорошим хранилищем событий? Будет ли это работать? Используя это производство? Интересует понимание, ссылки и т. Д.

По сути, состояние системы сохраняется на основе транзакций / событий, которые система когда-либо получала, вместо простого сохранения текущего состояния / снимка системы, что обычно и делается. (Думайте об этом как о Главной книге в бухгалтерии: все транзакции в конечном итоге сводятся к конечному состоянию). Это позволяет делать разные интересные вещи, но просто читайте по предоставленным ссылкам.

9 ответов

Решение

Kafka - это система обмена сообщениями, которая имеет много общего с хранилищем событий, однако, процитирую их введение:

Кластер Kafka сохраняет все опубликованные сообщения - независимо от того, были ли они использованы - втечение настраиваемого периода времени. Например, если срок хранения составляет два дня, то в течение двух дней после публикации сообщения оно становится доступным для использования, после чего оно будет сброшено для освобождения места. Производительность Kafka практически постоянна по отношению к размеру данных, поэтому сохранение большого количества данных не является проблемой.

Таким образом, хотя сообщения потенциально могут храниться неопределенно долго, ожидается, что они будут удалены. Это не означает, что вы не можете использовать это как хранилище событий, но может быть лучше использовать что-то еще. Взгляните на EventStore для альтернативы.

ОБНОВИТЬ

Кафка документация:

Источник событий - это стиль разработки приложения, в котором изменения состояния записываются в виде упорядоченной по времени последовательности записей. Поддержка Kafka очень больших хранимых данных журнала делает его отличным бэкэндом для приложения, созданного в этом стиле.

ОБНОВЛЕНИЕ 2

Одной из проблем, связанных с использованием Kafka для поиска событий, является количество необходимых тем. Обычно в источнике событий имеется поток (тема) событий для каждой сущности (такой как пользователь, продукт и т. Д.). Таким образом, текущее состояние объекта может быть восстановлено путем повторного применения всех событий в потоке. Каждый раздел Kafka состоит из одного или нескольких разделов, и каждый раздел хранится в виде каталога в файловой системе. Также будет давление со стороны ZooKeeper по мере увеличения количества узлов.

Я один из оригинальных авторов Кафки. Кафка будет очень хорошо работать в качестве журнала для поиска событий. Он отказоустойчив, масштабируется до огромных размеров данных и имеет встроенную модель разделения.

Мы используем его для нескольких случаев использования этой формы в LinkedIn. Например, наша система обработки потоков с открытым исходным кодом, Apache Samza, поставляется со встроенной поддержкой источников событий.

Я думаю, что вы мало что слышите об использовании Kafka для источников событий, в первую очередь потому, что терминология источников событий, кажется, не очень распространена в потребительском веб-пространстве, где Kafka наиболее популярен.

Я написал немного об этом стиле использования Кафки здесь.

Я продолжаю возвращаться к этому QA. И я не нашел существующие ответы достаточно нюансированные, поэтому я добавляю этот.

TL; DR. Да или Нет, в зависимости от вашего использования источника событий.

Я знаю о двух основных видах систем, основанных на событиях.

Нижестоящие процессоры событий = Да

В такой системе события происходят в реальном мире и записываются как факты. Например, складская система для отслеживания поддонов с продуктами. Там в основном нет конфликтующих событий. Все уже произошло, даже если это было не так. (Т. Е. Паллету 123456 положили на грузовик A, но он был запланирован для грузовика B.) Затем позже факты проверяются на наличие исключений с помощью механизмов отчетности. Кафка, кажется, хорошо подходит для этого вида приложений обработки событий.

В этом контексте понятно, почему люди Kafka защищают его как решение для поиска событий. Потому что это очень похоже на то, как оно уже используется, например, в потоках кликов. Однако люди, использующие термин Event Sourcing (в отличие от Stream Processing), скорее всего, ссылаются на второе использование...

Контролируемый приложением источник правды = Нет

Приложение такого типа объявляет свои собственные события в результате запросов пользователей, проходящих через бизнес-логику. Кафка не работает в этом случае по двум основным причинам.

Отсутствие изоляции объекта

В этом сценарии требуется возможность загрузки потока событий для конкретной сущности. Распространенной причиной этого является построение модели переходной записи для бизнес-логики, используемой для обработки запроса. Делать это нецелесообразно в Кафке. Использование темы для каждой сущности может позволить это, за исключением того, что это не является началом, когда могут существовать тысячи или миллионы этой сущности. Это связано с техническими ограничениями в Kafka/Zookeeper. Вместо Kafka рекомендуется использовать топик для каждого типа, но для этого потребуется загрузка событий для каждого объекта этого типа, чтобы получить события для одного объекта. Поскольку вы не можете сказать по позиции журнала, какие события принадлежат к какому объекту. Даже используя моментальные снимки, чтобы начать с известной позиции в журнале, это может быть значительное количество событий, через которые можно пройти. Но снимки не могут помочь вам с изменениями кода. Потому что добавление новых функций в бизнес-логику может сделать предыдущие снимки структурно несовместимыми. Таким образом, для создания новой модели все еще необходимо повторить тему. Одна из основных причин использования модели временной записи вместо постоянной - это сделать изменения в бизнес-логике дешевыми и легкими в развертывании.

Отсутствие обнаружения конфликта

Во-вторых, пользователи могут создавать условия гонки из-за одновременных запросов к одному и тому же объекту. Может быть весьма нежелательно сохранять конфликтующие события и разрешать их по факту. Поэтому важно уметь предотвращать конфликтующие события. Чтобы масштабировать загрузку запроса, обычно используют службы без сохранения состояния, предотвращая конфликты записи с использованием условной записи (запись только в том случае, если последнее событие объекта было #x). Ака Оптимистичный Параллелизм. Кафка не поддерживает оптимистичный параллелизм. Даже если бы он поддерживал это на уровне темы, он должен был бы быть полностью вплоть до уровня сущности, чтобы быть эффективным. Чтобы использовать Kafka и предотвращать конфликтующие события, вам необходимо использовать сериализованную запись с сохранением состояния на уровне приложения. Это существенное архитектурное требование / ограничение.

Дальнейшая информация


Обновление за комментарий

Это было слишком большим, чтобы поместиться в комментарии. Кажется, что большинство людей катят свою собственную реализацию хранилища событий поверх существующей базы данных. Для нераспределенных сценариев, таких как внутренние серверные или автономные продукты, хорошо документировано, как создать хранилище событий на основе SQL. И есть библиотеки, доступные поверх различных видов баз данных. Существует также EventStore, который построен для этой цели.

В распределенных сценариях я видел несколько разных реализаций. Проект Jet Panther использует Azure CosmosDB с функцией Change Feed для уведомления слушателей. Еще одна похожая реализация, о которой я слышал в AWS, - это использование DynamoDB с функцией Streams для уведомления слушателей. Ключ раздела, вероятно, должен быть идентификатором потока для лучшего распределения данных (чтобы уменьшить объем избыточного выделения ресурсов). Однако полное воспроизведение через потоки в "Динамо" обходится дорого (для чтения и с точки зрения затрат). Так что это подразумевалось также для Dynamo Streams для выгрузки событий на S3. Когда новый слушатель подключается к сети, или существующий слушатель хочет получить полное воспроизведение, он будет читать S3, чтобы наверстать упущенное.

Мой текущий проект - мультитенантный сценарий, и я перевернул свой собственный поверх Postgres. Что-то вроде Citus кажется подходящим для масштабируемости, разделения по tentant + stream.

Кафка все еще очень полезна в распределенных сценариях. Нетривиальная проблема - выставлять события каждого сервиса другим сервисам. Хранилище событий обычно не создается для этого, но это именно то, что делает Кафка хорошо. Каждый сервис имеет собственный внутренний источник правды (может быть хранилище событий или другое), но слушает Кафку, чтобы узнать, что происходит "снаружи". Команда также может публиковать свои служебные события в Кафке, чтобы проинформировать "извне" об интересных вещах, которые сделала служба.

Все существующие ответы кажутся довольно исчерпывающими, но есть проблема с терминологией, которую я хотел бы решить в своем ответе.

Что такое источник событий?

Кажется, если вы посмотрите на пять разных мест, вы получите пять разных ответов на этот вопрос.

Тем не менее, если вы посмотрите на статью Грега Янга от 2010 года, вы увидите, что она очень хорошо резюмирует идею, начиная со страницы 32, но не содержит окончательного определения, поэтому я осмеливаюсь сформулировать ее самостоятельно.

Event Sourcing — это способ сохранения состояния. Вместо замены одного состояния другим в результате мутации состояния вы сохраняете событие, представляющее эту мутацию. Таким образом, вы всегда можете получить текущее состояние сущности, прочитав все события сущности и последовательно применив эти мутации состояния. Таким образом, текущее состояние объекта становится левым сгибом всех событий для этого объекта.

Что значит «хорошее» хранилище событий (база данных)?

Любой механизм сохранения должен выполнять две основные операции:

  • Сохраните новое состояние объекта в базе данных
  • Получить состояние объекта из базы данных

Именно здесь Грег говорит о концепции потоков сущностей , где каждая сущность имеет свой собственный поток событий, однозначно идентифицируемый идентификатором сущности. Когда у вас есть база данных, способная считывать все события сущностей по идентификатору сущности (читать поток), использование Event Sourcing не является сложной проблемой.

Поскольку в статье Грега источник событий упоминается в контексте CQRS, он объясняет, почему эти две концепции хорошо сочетаются друг с другом. Несмотря на то, что у вас есть база данных, полная мутаций атомарного состояния для множества сущностей, запрос текущего состояния нескольких сущностей — тяжелая работа. Проблема решается путем разделения транзакционного (событийного) хранилища, которое используется в качестве источника достоверной информации, и хранилища отчетов (запросов, чтения), которое используется для отчетов и запросов о текущем состоянии системы для нескольких сущностей. Хранилище запросов не содержит никаких событий, оно содержит спроецированныесостояние нескольких сущностей, составленное на основе потребностей в запросе данных. Он не обязательно должен содержать снимки каждой сущности, вы можете свободно выбирать форму и форму модели запроса, если вы можете проецировать свои события на эту модель.

По этой причине «правильная» база данных событий должна поддерживать то, что мы называем подписками в реальном времени, которые будут доставлять новые (и исторические, если нам нужно воспроизвести) события в модель запроса для проецирования.

Мы также знаем, что нам нужно иметь состояние объекта при принятии решений о разрешенном переходе состояния. Например, уже выполненный денежный перевод не должен выполняться дважды. Поскольку модель запроса устарела по определению (даже на миллисекунды), она становится опасной, когда вы принимаете решения на основе устаревших данных. Поэтому мы используем самое последнее и полностью согласованное состояние из хранилища транзакций (событий) для восстановления состояния объекта при выполнении операций над объектом.

Иногда вы также хотите удалить весь объект из базы данных, что означает удаление всех его событий. Это может быть требование, например, соответствия GDPR.

Итак, какие атрибуты тогда потребуются для базы данных, используемой в качестве хранилища событий, чтобы обеспечить достойную работу системы, основанной на событиях? Немного:

  • Добавлять события в упорядоченный журнал только для добавления, используя идентификатор объекта в качестве ключа.
  • Загрузите все события для одного объекта в упорядоченной последовательности, используя идентификатор объекта в качестве ключа.
  • Удалить все события для данного объекта, используя идентификатор объекта в качестве ключа
  • Поддержка подписки в реальном времени на события проекта для моделей запросов.

Что такое Кафка?

Kafka — это высокомасштабируемый брокер сообщений, основанный на журнале только для добавления. Сообщения в Kafka создаются по темам, и в настоящее время одна тема часто содержит сообщения одного типа, что хорошо сочетается с реестром схем. Темой может быть что-то вроде загрузки ЦП, где мы производим временные ряды измерений загрузки ЦП для многих серверов.

Темы Kafka могут быть разделены. Разделение позволяет создавать и потреблять сообщения параллельно. Сообщения упорядочиваются только в пределах одного раздела, и обычно вам нужно использовать предсказуемый ключ раздела, поэтому Kafka может распределять сообщения по разделам.

Теперь давайте пройдемся по контрольному списку:

  • Можете ли вы добавить события в Kafka? Да, это называется производить. Можете ли вы добавить события с идентификатором объекта в качестве ключа? Не совсем так, поскольку ключ раздела используется для распределения сообщений по разделам, так что на самом деле это просто ключ раздела. Одна вещь, упомянутая в другом ответе, - это оптимистичный параллелизм. Если вы работали с реляционной базой данных, вы, вероятно, использовали Versionстолбец. Для баз данных NoSQL вы могли использовать eTag документа. Оба позволяют вам гарантировать, что вы обновляете сущность, находящуюся в известном вам состоянии, и что она не была видоизменена во время вашей операции. Kafka не предоставляет ничего для поддержки оптимистичного параллелизма для таких переходов между состояниями.
  • Можете ли вы прочитать все события для одного объекта из темы Kafka, используя идентификатор объекта в качестве ключа? Нет, ты не можешь . Поскольку Kafka не является базой данных, у нее нет индекса по темам, поэтому единственный способ получить сообщения из темы — это использовать их.
  • Можете ли вы удалить события из Kafka, используя идентификатор объекта в качестве ключа? Нет , это невозможно. Сообщения удаляются из темы только по истечении срока их хранения.
  • Можете ли вы подписаться на тему Kafka, чтобы получать живые (и исторические) события по порядку, чтобы вы могли проецировать их на свои модели запросов? Да , и поскольку темы разбиты на разделы, вы можете масштабировать свои прогнозы для повышения производительности.

Итак, почему люди продолжают это делать?

Я считаю, что причина, по которой многие люди утверждают, что Kafka — хороший выбор в качестве хранилища событий для систем с источниками событий, заключается в том, что они путают Event Sourcing с простым pub-sub (вы можете использовать рекламное слово «EDA» или вместо него Архитектура, управляемая событиями). Использование брокеров сообщений для разветвления событий на другие системные компоненты — это схема, известная уже несколько десятилетий. Проблема с «классическими» брокерами, поскольку сообщения исчезают, как только они потребляются, поэтому вы не можете построить что-то вроде модели запроса, которая будет построена из истории. Другая проблема заключается в том, что при проецировании событий вы хотите, чтобы они потреблялись в том же порядке, в котором они создаются, а «классические» брокеры обычно стремятся поддерживать шаблон конкурирующих потребителей, который по определению не поддерживает упорядоченную обработку сообщений. Не ошибись,поддержки конкурирующих потребителей, он имеет ограничение в один потребитель на один или несколько разделов, но не наоборот. Kafka довольно хорошо решил проблему упорядочения и хранения исторических сообщений. Итак, теперь вы можете создавать модели запросов на основе событий, которые вы отправляете через Kafka. Но первоначальная идея Event Sourcing заключается не в этом, а в том, что мы сегодня называем EDA. Как только это разделение станет ясным, мы, надеюсь, перестанем видеть заявления о том, что любой журнал событий, предназначенный только для добавления, является хорошим кандидатом на роль базы данных хранилища событий для систем с источниками событий.

Вы можете использовать Kafka в качестве хранилища событий, но я не рекомендую делать это, хотя это может показаться хорошим выбором:

  • Кафка гарантирует только один раз доставку, и в хранилище событий есть дубликаты, которые нельзя удалить. Обновление: здесь вы можете прочитать, почему так сложно с Кафкой, и некоторые последние новости о том, как, наконец, добиться этого поведения: https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
  • Из-за неизменности, нет никакого способа манипулировать хранилищем событий, когда приложение развивается и события должны быть преобразованы (существуют, конечно, такие методы, как апкастинг, но...). Однажды может быть сказано, что вам никогда не нужно преобразовывать события, но это неверное предположение, может быть ситуация, когда вы делаете резервную копию оригинала, но вы обновляете их до последних версий. Это действительное требование в управляемых событиями архитектурах.
  • Нет места для сохранения снимков сущностей / агрегатов, и воспроизведение будет становиться все медленнее и медленнее. Создание снимков должно быть характерно для хранилища событий в долгосрочной перспективе.
  • С учетом того, что разделы Кафки распределены и ими сложно управлять и их резервное копирование сравнить с базами данных. Базы данных просто проще:-)

Поэтому, прежде чем сделать свой выбор, подумайте дважды. Хранилище событий в виде комбинации интерфейсов прикладного уровня (мониторинг и управление), хранилище SQL/NoSQL и Kafka в качестве брокера - лучший выбор, чем предоставление Kafka обеих функций для создания полноценного полнофункционального решения.

Хранилище событий - это сложный сервис, который требует больше, чем может предложить Kafka, если вы серьезно относитесь к использованию источников событий, CQRS, Sagas и других шаблонов в архитектуре, управляемой событиями, и сохраняете высокую производительность.

Не стесняйтесь оспаривать мой ответ! Возможно, вам не понравится то, что я говорю о вашем любимом брокере с множеством перекрывающихся возможностей, но, тем не менее, Kafka не был разработан как хранилище событий, а больше как высокопроизводительный брокер и буфер одновременно для обработки быстрых производителей по сравнению со сценариями с медленными потребителями, например.

Пожалуйста, посмотрите на фреймворк с открытым исходным кодом для микросервисов eventuate.io, чтобы узнать больше о потенциальных проблемах: http://eventuate.io/

Обновление от 8 февраля 2018

Я не включаю новую информацию из комментариев, но согласен с некоторыми из этих аспектов. Это обновление больше о некоторых рекомендациях для управляемой событиями платформы микросервиса. Если вы серьезно относитесь к надежному микросервисному дизайну и максимально возможной производительности в целом, я дам вам несколько советов, которые могут вас заинтересовать.

  1. Не используйте Spring - это здорово (я сам часто его использую), но оно тяжелое и медленное одновременно. И это совсем не микросервисная платформа. Это "просто" фреймворк, чтобы помочь вам реализовать его (много работы за этим..). Другие фреймворки - это "просто" облегченные REST или JPA или фреймворки с различной ориентацией. Я рекомендую, вероятно, лучшую в своем классе полную микросервисную платформу с открытым исходным кодом, которая возвращается к чистым корням Java: https://github.com/networknt

Если вас интересует производительность, вы можете сравнить себя с существующим набором тестов. https://github.com/networknt/microservices-framework-benchmark

  1. Ни в коем случае не используйте Кафку:-)) Это наполовину шутка. Я имею в виду, хотя Кафка великолепна, это еще одна система, ориентированная на брокеров. Я думаю, что будущее за системами обмена сообщениями без посредников. Вы можете быть удивлены, но есть более быстрые, чем системы Kafka:-), конечно, вы должны перейти на более низкий уровень. Посмотри Хронику.

  2. Для хранилища событий я рекомендую улучшенное расширение Postgresql под названием TimescaleDB, которое фокусируется на высокопроизводительной обработке данных временных рядов (события - временных рядов) в большом объеме. Конечно, CQRS, Event Sourcing (функции воспроизведения и т. Д.) Встроены в фреймворк light4j из коробки, который использует Postgres в качестве места для хранения.

  3. Для обмена сообщениями попробуйте взглянуть на Chronicle Queue, Map, Engine, Network. Я имею в виду избавиться от этого старомодного решения, ориентированного на брокера, и перейти на систему микро-сообщений (встроенную). Хроника очереди на самом деле даже быстрее, чем Кафка. Но я согласен, что это не все в одном решении, и вам нужно заняться разработкой, иначе вы пойдете и купите версию Enterprise (платную). В конце концов, усилия по созданию из Chronicle собственного уровня обмена сообщениями будут оплачены за счет снятия бремени обслуживания кластера Kafka.

Да, вы можете использовать Kafka в качестве магазина событий. Он работает довольно хорошо, особенно с введением Kafka Streams, который предоставляет нативный Kafka способ перевести ваши события в накопленное состояние, к которому вы можете обращаться.

Что касается:

Возможность воспроизведения журнала событий, что позволяет новым подписчикам регистрироваться в системе после факта.

Это может быть сложно. Я подробно рассказал об этом здесь: /questions/47845478/apache-kafka-vosproizvedenie-soobschenij-v-teme/47845494#47845494

Я думаю, вам стоит посмотреть на фреймворк аксонов вместе с их поддержкой Kafka.

Если вы хотите быстро проверить код для потоковой передачи Kafka при весенней загрузке. Мне нравится, как Spring Kafka упрощает работу https://github.com/srjha/kfk-stream

https://github.com/srjha/sp-kafka-mservice

https://github.com/srjha/kafka-rest

Да, Kafka хорошо работает в модели источников событий, особенно в CQRS, однако вы должны позаботиться о настройке TTL для тем и всегда помнить, что Kafka не был разработан для этой модели, однако мы можем очень хорошо ее использовать.

Другие вопросы по тегам