Как реализовать подписку / публикацию Rebus между двумя микросервисами с копиями DTO, расположенными в разных сборках?
Я использую Rebus для следующей общей ситуации, естественной для WCF или WebAPI. Есть два микросервиса, имеющие одинаковый набор DTO в своих сборках, для обмена событиями интеграции. Классы DTO похожи, но они находятся в разных сборках с разными платформами.Net Core и.Net Framework. Делать DTO в отдельную общую сборку я бы не хотел. Как реализовать подписку / публикацию для обмена событиями интеграции между этими двумя микроуслугами на основе двух копий DTO, расположенных в разных сборках?
Пример первого микросервиса:
public class Message
{
public int MessageNumber { get; set; }
}
class Program
{
static void Main()
{
using (var adapter = new BuiltinHandlerActivator())
{
Configure.With(adapter)
.Logging(l => l.ColoredConsole(LogLevel.Warn))
.Transport(t => t.UseRabbitMqAsOneWayClient("amqp://myRebbitMQ"))
.Start();
var keepRunning = true;
while (keepRunning)
{
var key = char.ToLower(Console.ReadKey(true).KeyChar);
switch (key)
{
case 'f':
Console.WriteLine("Publishing {0} messages", 1000);
var jobs = Enumerable.Range(0, 1000)
.Select(i => new Message { MessageNumber = i })
.Select(m => adapter.Bus.Publish(m))
.ToArray();
Task.WaitAll(jobs);
break;
case 'q':
Console.WriteLine("Quitting");
keepRunning = false;
break;
}
}
}
}
}
Пример второго микросервиса:
public class Message
{
public int MessageNumber { get; set; }
}
public class MessageHandler : IHandleMessages<Message>
{
public async Task Handle(Message message)
{
Console.WriteLine("Processing message {0}", message.MessageNumber);
await Task.Delay(200);
}
}
class Program
{
private static IContainer _container;
static void Main()
{
var builder = new ContainerBuilder();
builder.RegisterType<MessageHandler>().As(typeof(IHandleMessages<>).MakeGenericType(typeof(Message)));
builder.RegisterRebus((configurer, context) => configurer
.Logging(l => l.ColoredConsole(LogLevel.Warn))
.Transport(t => t.UseRabbitMq("amqp://myRebbitMQ", "consumer2"))
//.Serialization(s => s.UseNewtonsoftJson(new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Objects })) ???
.Options(o => o.SetMaxParallelism(5))
);
using (_container = builder.Build())
{
var eventBus = _container.Resolve<IBus>();
eventBus.Subscribe<Message>().Wait();
Console.WriteLine("Consumer listening - press ENTER to quit");
Console.ReadLine();
}
}
}
1 ответ
В то время как у Rebus есть выдающиеся API для основанных на типах pub/sub, базовая реализация работает с обычными тематическими строками.
Это означает, что, например,
await bus.Subscribe<YourMessage>();
а также
await bus.Publish(new YourMessage());
просто переводятся в подписку и публикуют операции по теме "YourMessages.YourMessage, YourMessages"
(здесь притворяясь, что YourMessage
находится в YourMessages
сборка.
Вы можете получить доступ к необработанному API на основе тем, используя API тем, который вы можете получить здесь:
var topics = bus.Advanced.Topics;
и тогда ты сможешь
await topics.Subscribe("to anything");
а также
await topics.Publish("to anything", new YourMessage());
Поэтому, чтобы решить вашу проблему, я предлагаю вам воспользоваться этим и выбрать общую тему для каждого типа событий.