Графики MassTransitStateMachine нарушены?

Использование служебной шины Azure в качестве транспорта, но запланированные сообщения не работают, кроме как при вызовах из IConsumer.

Я провел часы и дни и до сих пор мало что понимаю, что происходит.

Может кто-нибудь объяснить, что мне нужно сделать, чтобы графики работали от конечного автомата с помощью служебной шины Azure? И, возможно, почему сообщение о расписании работает из контекста IConsumer, а не где-либо еще.

 public class BatchCollector : MassTransitStateMachine<BufferSaga>
{
    public BatchCollector(IBatchFactory batchFactory)
    {
        InstanceState(saga => saga.State);
        Event(() => BufferedCommandDetected,
            _ => _.CorrelateById(context => context.Message.GetBatchId()));

       Schedule(() => WindowElapsed, x => x.BatchCompletionId, x =>
        {
            x.Delay = TimeSpan.FromSeconds(5);
            x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
        });


        Initially(
            When(BufferedCommandDetected)
                .Then(
                    context =>
                    {
                        context.Instance.CorrelationId = context.Data.GetBatchId();
                        context.Instance.Id = Guid.NewGuid().ToString("N");
                        context.Instance.Buffer.Add(context.Data);
                        context.Instance.BatchStartTime = DateTimeOffset.Now;
                        context.Instance.AbsoluteDeadLine = DateTimeOffset.Now + context.Data.AbsoluteWindowSpan;
                        context.Instance.SlidingDeadLine = DateTimeOffset.Now + context.Data.SlidingWindowSpan;
                    })
                .Schedule(WindowElapsed,
                    context => new WindowElapsed {CorrelationId = context.Instance.CorrelationId },
                    delayProvider: scheduleDelayProvider => scheduleDelayProvider.Data.SlidingWindowSpan < scheduleDelayProvider.Data.AbsoluteWindowSpan ? scheduleDelayProvider.Data.SlidingWindowSpan : scheduleDelayProvider.Data.AbsoluteWindowSpan)
                .TransitionTo(Waiting));

        During(Waiting,
            When(BufferedCommandDetected)
                .Then(context =>
                {
                    context.Instance.SlidingDeadLine += context.Data.SlidingWindowSpan;
                    context.Instance.Buffer.Add(context.Data);
                }),
            When(WindowElapsed.Received, context => context.Instance.SlidingDeadLine > DateTimeOffset.Now && context.Instance.AbsoluteDeadLine > DateTimeOffset.Now)
                .Schedule(WindowElapsed, context => new WindowElapsed { CorrelationId = context.Instance.CorrelationId }),
            When(WindowElapsed.Received, context => context.Instance.SlidingDeadLine <= DateTimeOffset.Now || context.Instance.AbsoluteDeadLine <= DateTimeOffset.Now)
                //.Unschedule(WindowElapsed)
                .Publish(context => new Batch()
                {
                    BatchId = context.Instance.BatchCompletionId ?? Guid.NewGuid(),
                    Content = context.Instance.Buffer,
                    StartTime = context.Instance.BatchStartTime,
                    EndTime = DateTimeOffset.Now
                })
                .Finalize()
                .TransitionTo(BufferCompleted));

        SetCompletedWhenFinalized();
    }

    public Event<BufferedCommand> BufferedCommandDetected { get; private set; }


    public Schedule<BufferSaga, WindowElapsed> WindowElapsed { get; private set; }

    public State Waiting { get; private set; }

    public State BufferCompleted { get; private set; }
}

Автобус init:

container.RegisterType<IBusControl>(
            new HierarchicalLifetimeManager(),
            new InjectionFactory(c =>
            {
                var bus = Bus.Factory.CreateUsingAzureServiceBus(
                    cfg =>
                    {
                        var azSbHost = cfg.Host(new Uri(CloudConfigurationManager.GetSetting("ServiceBus.Url"))
                            , host =>
                            {
                                host.TokenProvider = TokenProvider
                                    .CreateSharedAccessSignatureTokenProvider
                                    (CloudConfigurationManager.GetSetting("ServiceBus.SharedAccessKeyName"),
                                        CloudConfigurationManager.GetSetting("ServiceBus.AccessKey"),
                                        TokenScope.Namespace);
                            });

                        cfg.ReceiveEndpoint(
                            azSbHost,
                            "Quartz.Scheduler",
                            sbConfig =>
                                {
                                    cfg.UseMessageScheduler(sbConfig.InputAddress);
                                    sbConfig.Consumer(() => new ScheduleMessageConsumer(c.Resolve<IScheduler>()));
                                }
                        );

                        cfg.ReceiveEndpoint(
                            azSbHost,
                            Assembly.GetExecutingAssembly().GetName().Name,
                            sbConfig =>
                            {
                                AllClasses.FromAssembliesInBasePath()
                                    .Where(
                                        @class =>
                                            (@class?.Namespace?.StartsWith("bcn",
                                                 StringComparison.OrdinalIgnoreCase) ?? false)
                                            &&
                                            @class.GetParentClasses()
                                                .Any(
                                                    parent =>
                                                            parent.Name.StartsWith("MassTransitStateMachine`1")))
                                    .ForEach(@class =>
                                    {
                                        //dynamic cast to avoid having to deal with generic typing when type is not known until runtime.                                                
                                        dynamic stateMachineExtension =
                                            new DynamicStaticWrapper(typeof(StateMachineSubscriptionExtensions));
                                        stateMachineExtension
                                            .StateMachineSaga(
                                                sbConfig,
                                                c.Resolve(@class),
                                                c.Resolve(typeof(ISagaRepository<>).MakeGenericType(
                                                    @class.GetParentClasses().First(parent =>
                                                                parent.Name.StartsWith("MassTransitStateMachine`1"))
                                                        .GetGenericArguments().First())));
                                    });



                                AllClasses.FromAssembliesInBasePath()
                                    .Where(
                                        @class =>
                                            (@class?.Namespace?.StartsWith("bcn", StringComparison.OrdinalIgnoreCase) ??
                                             false)
                                            && @class.GetInterfaces().Any(
                                                @interface =>
                                                    @interface?.FullName?.StartsWith("MassTransit.IConsumer`1") ??
                                                    false))
                                    .ForEach(@class =>
                                    {
                                        var factoryType = typeof(UnityConsumerFactory<>).MakeGenericType(@class);
                                        //Automatically register consumers.
                                        dynamic consumerFactory = Activator.CreateInstance(
                                            factoryType,
                                            container);
                                        var consumingMethod = typeof(ConsumerExtensions).
                                            GetMethods()
                                            .First(
                                                m =>
                                                    m.Name == "Consumer" && m.IsGenericMethod &&
                                                    m.GetGenericArguments().Length == 1 &&
                                                    m.GetParameters().Length == 3)
                                            .MakeGenericMethod(@class)
                                            .Invoke(null, new object[] {sbConfig, consumerFactory, null});

                                        //Automatically detect which payload contains message data. This message data is stored in blob.
                                        @class.GetInterfaces().Where(
                                                @interface =>
                                                        @interface.FullName.StartsWith("MassTransit.IConsumer`1"))
                                            .Select(@interface => @interface.GetGenericArguments().First())
                                            .Where(payload => payload.GetProperties()
                                                .Any(prop => prop.PropertyType.Name.StartsWith("MessageData`1")))
                                            .ForEach(
                                                BlobType =>
                                                    typeof(MessageDataConfiguratorExtensions)
                                                        .GetMethods()
                                                        .First(
                                                            method =>
                                                                method.GetParameters().First().ParameterType ==
                                                                typeof(IConsumePipeConfigurator)
                                                                &&
                                                                method.GetParameters().Last().ParameterType ==
                                                                typeof(IMessageDataRepository))
                                                        .MakeGenericMethod(BlobType)
                                                        .Invoke(null,
                                                            new object[]
                                                                {sbConfig, c.Resolve<IMessageDataRepository>()}));
                                    });
                            });

                        cfg.UseServiceBusMessageScheduler();
                        //azSbHost.
                    });

                return bus;
            }));
        container.RegisterType<IBus, IBusControl>();
        container.RegisterType<IBus, IBusControl>(new ContainerControlledLifetimeManager());

И тут началось:

  var container = UnityConfig.GetConfiguredContainer();
        var bus = container.Resolve<IBusControl>();
        bus.Start();

        var scheduler = container.Resolve<IScheduler>();
        scheduler.Start();

        bus.Publish<BufferedCommand>(new BufferedCommandAdapter<decimal>(10m, TimeSpan.FromSeconds(5),
            TimeSpan.FromSeconds(5)));

1 ответ

Вы настраиваете фабрику заданий на кварц? Посмотрите, как библиотека QuartzIntegration выполняет настройку:

https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.QuartzIntegration/QuartzIntegrationExtensions.cs

Кроме того, используйте наблюдатель вокруг шины, чтобы кварц запускался / останавливался / останавливался на одной линии с шиной.

https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.QuartzIntegration/Configuration/SchedulerBusObserver.cs

Другие вопросы по тегам