nservicebus и магазин событий
Мне интересно, сталкивался ли кто-нибудь с этим раньше:
Я обрабатываю команду и в обработчике сохраняю событие в хранилище событий (joliver).
Сразу после отправки обработчик для той же команды снова обрабатывается.
Я знаю, что это та же команда, потому что руководство по команде то же самое.
После пяти попыток nservicebus сообщает, что команда не выполнена из-за максимальных попыток.
Очевидно, что команда завершилась неудачно, но я не получаю никаких сведений о том, что не удалось. Я поместил содержимое диспетчера в ловушку try, но ошибки не обнаружены. После того, как код выйдет из диспетчера, обработчик событий всегда будет запускаться, как будто что-то не так.
При прохождении кода события сохраняются в базе данных (я вижу строку), диспетчер запускается, а для столбца Dispatched устанавливается значение true, а затем обработчик снова обрабатывает команду, процесс повторяется, и вставляется другая строка. в таблицу коммитов.
Что может быть неудачным? Я не устанавливаю флаг успеха где-нибудь в хранилище событий? Если я отделю хранилище событий от nServicebus, оба будут работать так, как ожидается, без повторов и сбоев.
Диспетчер:
public void Dispatch(Commit commit)
{
for (var i = 0; i < commit.Events.Count; i++)
{
try
{
var eventMessage = commit.Events[i];
var busMessage = (T)eventMessage.Body;
//bus.Publish(busMessage);
}
catch (Exception ex)
{
throw ex;
}
}
}
Wireup.Init()
private static IStoreEvents WireupEventStore()
{
return Wireup.Init()
.LogToOutputWindow()
.UsingSqlPersistence("EventStore")
.InitializeStorageEngine()
.UsingBinarySerialization()
//.UsingJsonSerialization()
// .Compress()
//.UsingAsynchronousDispatchScheduler()
// .DispatchTo(new NServiceBusCommitDispatcher<T>())
.UsingSynchronousDispatchScheduler()
.DispatchTo(new DelegateMessageDispatcher(DispatchCommit))
.Build();
}
1 ответ
У меня была открыта область транзакции в сохранении, которое я никогда не закрывал.
public static void Save(AggregateRoot root)
{
// we can call CreateStream(StreamId) if we know there isn't going to be any data.
// or we can call OpenStream(StreamId, 0, int.MaxValue) to read all commits,
// if no commits exist then it creates a new stream for us.
using (var scope = new TransactionScope())
using (var eventStore = WireupEventStore())
using (var stream = eventStore.OpenStream(root.Id, 0, int.MaxValue))
{
var events = root.GetUncommittedChanges();
foreach (var e in events)
{
stream.Add(new EventMessage { Body = e });
}
var guid = Guid.NewGuid();
stream.CommitChanges(guid);
root.MarkChangesAsCommitted();
scope.Complete(); // <-- missing this
}
}