Как применить EventBus CQRS с Akka.net и Akka.Persistence
Меня интересует, как применять CQRS и Event Sourcing с Akka.net. Я уже заметил http://getakka.net/docs/Persistence, которое поставляет часть ES.
Насколько я понимаю, обработчики команд и AggergateRoot могут быть представлены ReceivePersistentActor в одном классе. У меня есть базовый класс для этого (не полный здесь)...
public abstract class AggregateRoot<TState> : ReceivePersistentActor
{
private readonly Func<TState, bool> shouldSafeSnapShot;
protected AggregateRoot(TState state, Func<TState, bool> shouldSafeSnapShot)
{
if (state == null) throw new ArgumentNullException(nameof(state));
if (shouldSafeSnapShot == null) throw new ArgumentNullException(nameof(shouldSafeSnapShot));
var path = this.Self.Path;
this.PersistenceId = $"{path.Parent.Name}/{path.Name}";
this.State = state;
this.shouldSafeSnapShot = shouldSafeSnapShot;
}
public sealed override string PersistenceId { get; }
protected TState State { get; }
protected void Emit<TEvent>(TEvent e, Action<TEvent> apply = null)
{
this.Persist(e,
@event =>
{
// update state
apply?.Invoke(@event);
// safe snapshot or so...
this.SaveSnapshotIfRequired();
// publish to event bus?
Context.System.EventStream.Publish(e);
});
}
private void SaveSnapshotIfRequired()
{
var state = this.State;
if (this.shouldSafeSnapShot(state))
{
this.SaveSnapshot(state);
}
}
}
Не хочу, чтобы событие отправлялось через EventBus, и Akka, кажется, вносит в свой пакет что-то, что может соответствовать тому, как это делается с другими средами CQRS (абстракции EventBus в NCqrs, Inceptum) или в простом примере Грегори Янга здесь.
// publish to event bus?
Context.System.EventStream.Publish(e);
Тем не менее, мне кажется странным знать реализацию EventBus внутри субъекта объекта домена AggregateRoot. Я мог бы возложить ответственность на потомка актера или внедрить что-то вроде интерфейсаIAbstractEventBus, который используется для переключения реализаций, но я не думаю, что AggregateRoot должен публиковать событие после сохранения и информировать всех подписчиков, верно!? Но часть ES, кажется, связана с Актером.
Какой типичный способ сделать это с Akka.net и Akka.Persistence? Любые идеи дизайна, как это разделить?
Я наткнулся на интерфейс IEventAdapter Akka.net, но я не уверен, что это приведет меня к правильному пути (или темной стороне)...
public class SomeEventAdapter : IEventAdapter
{
public object ToJournal(object evt)
{
return evt; // should I add the event emitter here?
// but is this already stored now? hmmm
}
}
2 ответа
В идеалистическом мире ваш командный обработчик может позаботиться о отслеживании событий, чтобы передать их на шину событий и сохранить их в той или иной форме, когда команда полностью обработана без каких-либо проблем - это гарантирует, что вы не будете преждевременно отправлять событие до Метод агрегата завершил получение всех событий, которые он должен.
В Akka.net вы могли бы изменить сообщение "Задать вопрос" при передаче команды в агрегат (при условии, что ваши обработчики отделены от агрегатов ради чистоты), и агрегат должен возвращать события, которые он хочет вызвать и сохранить обратно. обработчик, поэтому обработчик может затем отправлять и сохранять их (возможно, косвенно сам... возможно, через какую-то форму на UOW)... это, однако, означает, что никакая другая команда, обработчик которой сконфигурирован для обработки, не может быть обработана, пока ваш агрегат закончил... хотя другие агрегаты, вероятно, обслуживаются одним обработчиком - Спросите блокировки в конце концов... Скажите нет.
В некоторых других системах сам уровень постоянства может действовать как шина событий. EventStore Грега Янга является одним из них... вы полностью избавляетесь от своих проблем).
Это задача идеалистической теории против реализации в реальном мире и ограничений структуры.
В идеалистическом мире совокупный корень НЕ занимается хранением событий, которые он вызывает... его цель (некоторые скажут) состоит в том, чтобы гидрировать себя из предоставленной коллекции событий, инициировать события и заставить другие компоненты управлять широковещательной передачей, постоянством и запросами (для увлажнения) тех событий. Я знаю, что Akka.net предоставляет своим участникам действующий уровень доступа к БД / персистентности БД: что неплохо... но не соответствует чистой реализации SOLID CQRS. Таким образом, вы можете начать мутить воду через вашу реализацию, где вы находитесь.
Это далеко от идеала, но один вариант
protected virtual void Execute<TCommand>(Action<TCommand> action, TCommand command)
{
UnitOfWork.Add(this);
try
{
action(command);
UnitOfWork.Commit();
Sender.Tell(true, Self);
}
catch(Exception exception)
{
Logger.LogError("Executing an Akka.net request failed.", exception: exception);
Sender.Tell(false, Self);
throw;
}
}
который затем может быть подключен через
Receive<SayHelloWorldCommand>(command => Execute(SayHello, command));
Он взят из cqrs.net и использует единицу работы (UOW) в агрегате, а не передает его обратно обработчику. Это компромисс, который они сделали для того, чтобы, по крайней мере, напрямую исключить вышеуказанные проблемы из агрегата и придерживаться некоторой формы реализации SOLID.
@Beachwalker У меня есть реализация с использованием подхода CQRS по адресу https://github.com/thangchung/magazine-website-akka. Я не знаю, это может вам помочь или нет? Но надеюсь, что вы найдете что-то полезное. Я продолжаю добавлять больше функций и возможностей для этого.