NServiceBus и NHibernate EventListeners, работающие в разных потоках
Я пытаюсь добиться того, чтобы мой веб-сайт вызывал сообщение и помещал его в шину, служба забирает его и записывает в базу данных с помощью аудита, который автоматически заполняет поле AddedBy/ updatedBy строки.
Я делаю это с помощью компонента NServiceBus IMessageMutator, который записывает идентификатор пользователя в заголовки сообщений из Thread.CurrentPrincipal, который поступает от зарегистрированного пользователя в моем приложении ASP.Net. В моем сервисе я использую IMessageModule для извлечения этого заголовка и связывания его с Thread.CurrentPrincipal. Это прекрасно работает, и во время моего обработчика сообщений я вижу, что Thread.CurrentPrincipal.Identity.Name правильно связан с идентификатором пользователя, который вызвал сообщение в веб-приложении.
Используя IPreUpdateEventListener/IPreInsertEventListener NHibernate Я устанавливаю AddedBy/ updatedBy каждой сущности до ее записи в БД. Это отлично работает на веб-сайте, но в моей службе NServiceBus поток, в котором работает слушатель, отличается от потока, в котором работал обработчик, что означает, что CurrentPrincipal потока больше не является идентификатором, привязанным в моем IMessageModule.
Я вижу, что NHibernate использует DistributedTransactionFactory в стеке вызовов, что, как я подозреваю, является причиной моей проблемы. Я не хочу потерять транзакционность, чтобы в случае неудачной фиксации сообщение не повторялось и не помещалось в очередь ошибок, а также если удаление сообщения из очереди завершилось неудачно и обновление не откатилось до БД.
Я просмотрел сеть, и во всех примерах используется CurrentPrincipal потока для привязки идентификатора пользователя, который изменил строки. То, что я ищу, - это способ сохранить слушатель NHibernate в том же потоке, что и обработчик сообщений, или передать идентификатор пользователя слушателю, чтобы он мог быть привязан к объекту до его записи в БД.
Вот мой слушатель, я опустил найденный в нем метод Set
public class EntityPersistenceListener : IPreUpdateEventListener, IPreInsertEventListener
{
public bool OnPreUpdate(PreUpdateEvent @event)
{
var audit = @event.Entity as EntityBase;
if (audit == null)
return false;
var time = DateTimeFactory.GetDateTime();
var name = Thread.CurrentPrincipal.Identity.Name;
Set(@event.Persister, @event.State, "AddedDate", audit.AddedDate);
Set(@event.Persister, @event.State, "AddedBy", audit.AddedBy);
Set(@event.Persister, @event.State, "UpdatedDate", time);
Set(@event.Persister, @event.State, "UpdatedBy", name);
audit.AddedDate = audit.AddedDate;
audit.AddedBy = audit.AddedBy;
audit.UpdatedDate= time;
audit.UpdatedBy = name;
return false;
}
}
А вот модуль сообщений NServiceBus, который извлекает идентификатор и привязывает его к идентификатору текущего потока
public class TenantAndInstanceInfoExtractor : IMessageModule
{
private readonly IBus _bus;
public TenantAndInstanceInfoExtractor(IBus bus)
{
_bus = bus;
}
public void HandleBeginMessage()
{
var headers = _bus.CurrentMessageContext.Headers;
if (headers.ContainsKey("TriggeredById"))
Thread.CurrentPrincipal = new GenericPrincipal(new GenericIdentity(headers["TriggeredById"]), null);
else
Thread.CurrentPrincipal = new GenericPrincipal(new GenericIdentity(string.Empty), null);
}
public void HandleEndMessage()
{
}
public void HandleError() { }
}
1 ответ
Спасибо Саймон за всю вашу помощь. После тщательного изучения моей проблемы и обсуждения внутренней работы NServiceBus я понял вашу идею и реализовал модуль единицы работы для NServiceBus.
Происходило то, что мы полагались на транзакцию, созданную для каждого сообщения, чтобы зафиксировать наш сеанс NHibernate в БД. Это происходит на контроллере распределенных транзакций (в частности, здесь NHibernate.Transaction.AdoNetWithDistributedTransactionFactory.DistributedTransactionContext), который использует пул потоков.
Используя интерфейс IManageUnitsOfWork NServiceBus, я смог явно зафиксировать нашу транзакцию в том же потоке, что и обработчик сообщений, как показано ниже в примере кода.
Как примечание для будущих читателей, лучшее решение здесь - не использовать Thread.CurrentPrincipal, так как это решение не работает в многопоточных средах, как для меня.
public class HiJumpNserviceBusUnitOfWork : IManageUnitsOfWork
{
private readonly IUnitOfWork _uow;
public HiJumpNserviceBusUnitOfWork(IUnitOfWork uow)
{
_uow = uow;
}
public void Begin()
{
_uow.ClearCache();
_uow.BeginTransaction();
}
public void End(Exception ex = null)
{
if (ex != null)
{
_uow.ClearCache();
}
else
{
_uow.CommitTransaction();
}
}
}