Как преобразовать существующий класс в агрегат DDD с событиями?
У меня есть следующий класс, который используется для выполнения торговых операций по покупке и продаже подписок. Я хотел бы преобразовать этот класс, чтобы он мог использоваться в микросервисе с использованием источников событий и, возможно, CQRS. Идея, которая у меня возникла, заключается в том, что она будет жить внутри Service Fabric Actor, где этот класс будет полностью в памяти.
public class OrderBook
{
public const int ScaleFactor = 10_000;
private long _orderId = 1;
public OrderBook()
{
Limits = new RankedSet<Limit>(new LimitPriceComparer()) { new Limit { Price = 1 * ScaleFactor } };
Subscriptions = new Dictionary<long, Subscription>();
Orders = new Dictionary<long, Order>();
}
private RankedSet<Limit> Limits { get; }
private IDictionary<long, Subscription> Subscriptions { get; }
private IDictionary<long, Order> Orders { get; }
public Order Ask(long userId, long price, int shares)
{
if (userId <= 0 || price <= 0 || shares <= 0)
{
// TODO: Return a message or something.
return null;
}
price = price * ScaleFactor;
// Get the users subscription.
if (!Subscriptions.TryGetValue(userId, out Subscription subscription))
{
// TODO: Return a message or something.
return null;
}
var index = Limits.Count - 1;
var originalShares = shares;
while (index >= 0 && shares > 0)
{
var currentLimit = Limits.ElementAt(index);
if (currentLimit.Price < price)
{
break;
}
Order order = currentLimit.BidHead;
while (order != null && shares > 0)
{
if (order.Subscription.UserId == userId)
{
if (order.Next == null)
{
break;
}
else
{
order = order.Next;
}
}
// Always assume the bid will have a subscription even if it's empty.
if (order.Shares >= shares)
{
order.Subscription.Owned += shares;
order.Shares -= shares;
shares = 0;
}
else
{
order.Subscription.Owned += order.Shares;
shares -= order.Shares;
order.Shares = 0;
}
order = order.Next;
}
index--;
}
if (shares > 0)
{
subscription.Owned -= originalShares - shares;
var newOrder = new Order { Id = /*Interlocked.Increment(ref _orderId)*/_orderId++, Shares = shares, Subscription = subscription };
// At this point Limits is guaranteed to have a single Limit.
var prevLimit = Limits.ElementAt(index == Limits.Count - 1 ? index : ++index);
if (prevLimit.Price == price)
{
newOrder.ParentLimit = prevLimit;
if (prevLimit.AskHead == null)
{
prevLimit.AskHead = newOrder;
}
else
{
newOrder.Next = prevLimit.AskHead;
prevLimit.AskHead.Prev = newOrder;
prevLimit.AskHead = newOrder;
}
}
else
{
var newLimit = new Limit { AskHead = newOrder, Price = price };
newOrder.ParentLimit = newLimit;
Limits.Add(newLimit);
}
Orders.Add(newOrder.Id, newOrder);
return newOrder;
}
else
{
subscription.Owned -= originalShares;
}
return null;
}
}
Вот начало того, что я думаю, будет выглядеть преобразование в совокупность. Проблема, с которой я сталкиваюсь, заключается в том, что при возникновении TradeExecutedEvent необходимо изменить состояние агрегата в целом. Другими словами, если это событие было запущено само по себе, это не имело бы смысла, поскольку оно зависело от событий, которые предшествуют этому. Единственная причина, по которой я думал, что мне нужен TradeExecutedEvent, - это уведомить пользовательский интерфейс о том, что их сделка была выполнена.
Будет ли лучше хранить TradeExecutedEvent в Event Store, но просто не иметь соответствующего метода Apply, чтобы другие службы / подписчики могли получать уведомления о совершении сделки?
Мне кажется, я думал об этом совершенно неправильно, так как полагаю, что агрегаты являются временными и недолговечными, как этот. Буду признателен за любые предложения или рекомендации.
public class TradeAggregate : AggregateBase
{
private const int ScaleFactor = 10_000;
private RankedSet<Limit> Limits { get; }
private IDictionary<long, Subscription> Subscriptions { get; }
private IDictionary<long, Order> Orders { get; }
public TradeAggregate(string asset)
{
Limits = new RankedSet<Limit>(new LimitPriceComparer()) { new Limit { Price = 1 * ScaleFactor } };
Subscriptions = new Dictionary<long, Subscription>();
Orders = new Dictionary<long, Order>();
}
public void Ask(long userId, long price, int shares)
{
if (userId <= 0 || price <= 0 || shares <= 0)
{
// TODO: Return a message or something.
return;
}
price = price * ScaleFactor;
if (!Subscriptions.TryGetValue(userId, out Subscription subscription))
{
throw new System.Exception("You do not own this subscription.");
}
RaiseEvent(new AskOrderPlacedEvent(subscription, price, shares));
}
public void Apply(AskOrderPlacedEvent e)
{
var index = Limits.Count - 1;
var shares = e.Shares;
while (index >= 0 && shares > 0)
{
var currentLimit = Limits.ElementAt(index);
if (currentLimit.Price < e.Price)
{
break;
}
Order order = currentLimit.BidHead;
while (order != null && shares > 0)
{
if (order.Subscription.UserId == e.Subscription.UserId)
{
if (order.Next == null)
{
break;
}
else
{
order = order.Next;
}
}
// Always assume the bid will have a subscription even if it's empty.
if (order.Shares >= shares)
{
RaiseEvent(new TradePartiallyExecutedEvent(order, shares, e.Subscription, e.Shares));
shares = 0;
}
else
{
RaiseEvent(new TradeExecutedEvent(order, shares, e.Subscription, e.Shares));
shares -= order.Shares;
}
order = order.Next;
}
index--;
}
if (shares > 0)
{
// .... etc.
}
else
{
// .... etc.
}
}
public void Apply(TradePartiallyExecutedEvent e)
{
e.Order.Subscription.Owned += e.Shares;
e.Order.Shares -= e.Shares;
e.Subscription.Owned -= e.OriginalShares - e.Shares;
}
public void Apply(TradeExecutedEvent e)
{
e.Order.Subscription.Owned += e.Order.Shares;
e.Order.Shares = 0;
e.Subscription.Owned -= e.OriginalShares;
}
}
1 ответ
Если я не понимаю вас, то вы пытаетесь сделать так, чтобы агрегат реагировал на событие, вызванное им самим.
Это не имеет смысла. Агрегат является транзакционно согласованным, поэтому вы можете выполнять все операции за одну транзакцию без использования событий.
События предназначены для возможной согласованности между различными агрегатами одного и того же BC или между двумя BC, поскольку в транзакции изменяется только состояние одного агрегата. Агрегат должен сообщить остальному миру за пределами своей транзакционной границы, что что-то произошло, вызвав событие.
Я думаю, что, возможно, вам стоит взглянуть на свой домен, чтобы проверить, нужно ли разбивать совокупность на несколько. Если это так, используйте события для их асинхронной передачи. В противном случае (имея только один агрегат) вам не нужно прослушивать события, просто вызовите их, чтобы сохранить их в хранилище событий.