Как реализовать подписку / публикацию Rebus между двумя микросервисами с копиями DTO, расположенными в разных сборках?

Я использую Rebus для следующей общей ситуации, естественной для WCF или WebAPI. Есть два микросервиса, имеющие одинаковый набор DTO в своих сборках, для обмена событиями интеграции. Классы DTO похожи, но они находятся в разных сборках с разными платформами.Net Core и.Net Framework. Делать DTO в отдельную общую сборку я бы не хотел. Как реализовать подписку / публикацию для обмена событиями интеграции между этими двумя микроуслугами на основе двух копий DTO, расположенных в разных сборках?

Пример первого микросервиса:

public class Message
{
    public int MessageNumber { get; set; }
}

class Program
{
    static void Main()
    {
        using (var adapter = new BuiltinHandlerActivator())
        {
            Configure.With(adapter)
                .Logging(l => l.ColoredConsole(LogLevel.Warn))
                .Transport(t => t.UseRabbitMqAsOneWayClient("amqp://myRebbitMQ"))
                .Start();

            var keepRunning = true;
            while (keepRunning)
            {
                var key = char.ToLower(Console.ReadKey(true).KeyChar);
                switch (key)
                {
                    case 'f':
                        Console.WriteLine("Publishing {0} messages", 1000);
                        var jobs = Enumerable.Range(0, 1000)
                            .Select(i => new Message { MessageNumber = i })
                            .Select(m => adapter.Bus.Publish(m))
                            .ToArray();
                        Task.WaitAll(jobs);
                        break;
                    case 'q':
                        Console.WriteLine("Quitting");
                        keepRunning = false;
                        break;
                }
            }
        }
    }
}

Пример второго микросервиса:

public class Message
{
    public int MessageNumber { get; set; }
}

public class MessageHandler : IHandleMessages<Message>
{
    public async Task Handle(Message message)
    {
        Console.WriteLine("Processing message {0}", message.MessageNumber);

        await Task.Delay(200);
    }
}

class Program
{
    private static IContainer _container;
    static void Main()
    {
        var builder = new ContainerBuilder();
        builder.RegisterType<MessageHandler>().As(typeof(IHandleMessages<>).MakeGenericType(typeof(Message)));
        builder.RegisterRebus((configurer, context) => configurer
            .Logging(l => l.ColoredConsole(LogLevel.Warn))
            .Transport(t => t.UseRabbitMq("amqp://myRebbitMQ", "consumer2"))
            //.Serialization(s => s.UseNewtonsoftJson(new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Objects })) ???
            .Options(o => o.SetMaxParallelism(5))
        );
        using (_container = builder.Build())
        {
            var eventBus = _container.Resolve<IBus>();
            eventBus.Subscribe<Message>().Wait();
            Console.WriteLine("Consumer listening - press ENTER to quit");
            Console.ReadLine();
        }
    }
}

1 ответ

Решение

В то время как у Rebus есть выдающиеся API для основанных на типах pub/sub, базовая реализация работает с обычными тематическими строками.

Это означает, что, например,

await bus.Subscribe<YourMessage>();

а также

await bus.Publish(new YourMessage());

просто переводятся в подписку и публикуют операции по теме "YourMessages.YourMessage, YourMessages" (здесь притворяясь, что YourMessage находится в YourMessages сборка.

Вы можете получить доступ к необработанному API на основе тем, используя API тем, который вы можете получить здесь:

var topics = bus.Advanced.Topics;

и тогда ты сможешь

await topics.Subscribe("to anything");

а также

await topics.Publish("to anything", new YourMessage());

Поэтому, чтобы решить вашу проблему, я предлагаю вам воспользоваться этим и выбрать общую тему для каждого типа событий.

Другие вопросы по тегам