Rhino ESB Опубликовать сообщение для себя

Использование Rhino Service Bus. У меня есть бэкэнд-приложение, которое обрабатывает обработку, и есть другое приложение (клиентский интерфейс), который публикует сообщения на бэкэнд. У меня есть Saga на бэк-энде, во время которого я хочу, чтобы сага публиковала сообщения для себя, разбивая обработку на несколько небольших задач, которые могут выполняться в своих собственных потоках. Проблема в том, что сообщения всегда отбрасываются, если они подписаны через интерфейс Orchestrates. Я могу подписаться в другом классе, используя ConsumerOf, и потребитель получит сообщение.

namespace Sagas
{
public class MoveJobSaga: ISaga<MoveJobState>,
    InitiatedBy<TriggerMoveJobCommand>,
    Orchestrates<TriggerMoveTerminalCommand>
{
    private readonly IServiceBus _bus;
    private readonly ITerminalFilesService _tfService;

    public MoveJobSaga(IServiceBus bus)
    {
        _bus = bus;
        State = new MoveJobState();
    }

    public MoveJobState State { get; set; }
    public Guid Id { get; set; }
    public bool IsCompleted { get; set; }

    public void Consume(TriggerMoveJobCommand message)
    {
        State.TerminalsToProcess = message.Terminals.Count();
        State.JobId = message.JobId;
        foreach (var terminal in message.Terminals)
        {
            _bus.Publish(new TriggerMoveTerminalCommand() 
            { 
                CorrelationId = message.CorrelationId,
                Name = terminal.Name
            });
        }
    }

    public void Consume(TriggerMoveTerminalCommand message)
    {
        var result = _tfService.MoveTerminalFiles(message.SourceTifDir, message.TargetTifDir, message.SourceDatDir, message.TargetDatDir);
        State.TerminalsProcessed++;

        if (State.TerminalsToProcess == State.TerminalsProcessed)
        {
            _bus.Publish(new MoveJobCompletedEvent() 
            { 
               Success = State.Success, 
               JobId = State.JobId });
        }
    }
}

public class MoveJobState
{
    public MoveJobState()
    {
        Success = true;
    }
    public int TerminalsToProcess { get; set; }
    public int TerminalsProcessed { get; set; }
    public int JobId { get; set; }
    public bool Success { get; set; }
}
}

Конфигурация хоста:

<rhino.esb>
<bus threadCount="1" numberOfRetries="5" endpoint="msmq://localhost/myapp.host" />
<messages />
</rhino.esb>

самозагрузки:

public class HostBootStrapper: StructureMapBootStrapper
{
    protected override void ConfigureContainer()
    {
        base.ConfigureContainer();
        Container.Configure(sm =>
        {
            sm.For<ISagaPersister<MoveJobSaga>>().Use<InMemorySagaPersister<MoveJobSaga>>();
            sm.Scan(x =>
            {
                x.TheCallingAssembly();
                x.WithDefaultConventions();
            });
        });
    }
}

1 ответ

Решение

Мне нужно было зарегистрировать ISagaPersister как синглтон:

            sm.For<ISagaPersister<MoveJobSaga>>()
                .Singleton()
                .Use<InMemorySagaPersister<MoveJobSaga>>();

Также мне пришлось перенести публикацию всех сообщений, используемых сагой, за ее пределы. Поэтому я создал класс обслуживания, который запускает начальное сообщение, а затем каждое сообщение в задании.

Другие вопросы по тегам