NServiceBus и NHibernate EventListeners, работающие в разных потоках

Я пытаюсь добиться того, чтобы мой веб-сайт вызывал сообщение и помещал его в шину, служба забирает его и записывает в базу данных с помощью аудита, который автоматически заполняет поле AddedBy/ updatedBy строки.

Я делаю это с помощью компонента NServiceBus IMessageMutator, который записывает идентификатор пользователя в заголовки сообщений из Thread.CurrentPrincipal, который поступает от зарегистрированного пользователя в моем приложении ASP.Net. В моем сервисе я использую IMessageModule для извлечения этого заголовка и связывания его с Thread.CurrentPrincipal. Это прекрасно работает, и во время моего обработчика сообщений я вижу, что Thread.CurrentPrincipal.Identity.Name правильно связан с идентификатором пользователя, который вызвал сообщение в веб-приложении.

Используя IPreUpdateEventListener/IPreInsertEventListener NHibernate Я устанавливаю AddedBy/ updatedBy каждой сущности до ее записи в БД. Это отлично работает на веб-сайте, но в моей службе NServiceBus поток, в котором работает слушатель, отличается от потока, в котором работал обработчик, что означает, что CurrentPrincipal потока больше не является идентификатором, привязанным в моем IMessageModule.

Я вижу, что NHibernate использует DistributedTransactionFactory в стеке вызовов, что, как я подозреваю, является причиной моей проблемы. Я не хочу потерять транзакционность, чтобы в случае неудачной фиксации сообщение не повторялось и не помещалось в очередь ошибок, а также если удаление сообщения из очереди завершилось неудачно и обновление не откатилось до БД.

Я просмотрел сеть, и во всех примерах используется CurrentPrincipal потока для привязки идентификатора пользователя, который изменил строки. То, что я ищу, - это способ сохранить слушатель NHibernate в том же потоке, что и обработчик сообщений, или передать идентификатор пользователя слушателю, чтобы он мог быть привязан к объекту до его записи в БД.

Вот мой слушатель, я опустил найденный в нем метод Set

public class EntityPersistenceListener : IPreUpdateEventListener, IPreInsertEventListener
{
    public bool OnPreUpdate(PreUpdateEvent @event)
    {
        var audit = @event.Entity as EntityBase;
        if (audit == null)
            return false;

        var time = DateTimeFactory.GetDateTime();

        var name = Thread.CurrentPrincipal.Identity.Name;

        Set(@event.Persister, @event.State, "AddedDate", audit.AddedDate);
        Set(@event.Persister, @event.State, "AddedBy", audit.AddedBy);
        Set(@event.Persister, @event.State, "UpdatedDate", time);
        Set(@event.Persister, @event.State, "UpdatedBy", name);

        audit.AddedDate = audit.AddedDate;
        audit.AddedBy = audit.AddedBy;
        audit.UpdatedDate= time;
        audit.UpdatedBy = name;

        return false;            
    }
}

А вот модуль сообщений NServiceBus, который извлекает идентификатор и привязывает его к идентификатору текущего потока

public class TenantAndInstanceInfoExtractor : IMessageModule
{
    private readonly IBus _bus;

    public TenantAndInstanceInfoExtractor(IBus bus)
    {
        _bus = bus;
    }

    public void HandleBeginMessage()
    {
        var headers = _bus.CurrentMessageContext.Headers;

        if (headers.ContainsKey("TriggeredById"))
            Thread.CurrentPrincipal = new GenericPrincipal(new GenericIdentity(headers["TriggeredById"]), null);
        else
            Thread.CurrentPrincipal = new GenericPrincipal(new GenericIdentity(string.Empty), null);
    }

    public void HandleEndMessage()
    {

    }

    public void HandleError() { }
}

1 ответ

Решение

Спасибо Саймон за всю вашу помощь. После тщательного изучения моей проблемы и обсуждения внутренней работы NServiceBus я понял вашу идею и реализовал модуль единицы работы для NServiceBus.

Происходило то, что мы полагались на транзакцию, созданную для каждого сообщения, чтобы зафиксировать наш сеанс NHibernate в БД. Это происходит на контроллере распределенных транзакций (в частности, здесь NHibernate.Transaction.AdoNetWithDistributedTransactionFactory.DistributedTransactionContext), который использует пул потоков.

Используя интерфейс IManageUnitsOfWork NServiceBus, я смог явно зафиксировать нашу транзакцию в том же потоке, что и обработчик сообщений, как показано ниже в примере кода.

Как примечание для будущих читателей, лучшее решение здесь - не использовать Thread.CurrentPrincipal, так как это решение не работает в многопоточных средах, как для меня.

public class HiJumpNserviceBusUnitOfWork : IManageUnitsOfWork
{
    private readonly IUnitOfWork _uow;

    public HiJumpNserviceBusUnitOfWork(IUnitOfWork uow)
    {
        _uow = uow;
    }

    public void Begin()
    {
        _uow.ClearCache();
        _uow.BeginTransaction();
    }

    public void End(Exception ex = null)
    {
        if (ex != null)
        {
            _uow.ClearCache();
        }
        else
        {
            _uow.CommitTransaction();
        }
    }
}
Другие вопросы по тегам