MassTransit и события против публикации команд

Я новичок в MassTransit, и мне что-то не хватает в моем понимании.

Допустим, у меня есть ферма серверов, где все узлы могут выполнять одну и ту же работу. Каркас приложения выполнен в стиле CQRS. Это означает, что у меня есть два основных вида сообщений для публикации:

  • Команды: должен обрабатываться ровно одним сервером, любым из них (первым с свободным слотом для задания)
  • События: должны обрабатываться всеми серверами

Я создал чрезвычайно простой прототип MassTransit (консольное приложение, которое посылает привет каждые X секунд).

Я вижу, что в API есть метод публикации. Как я могу указать, что это за сообщение (одно против всего сервера)?

Если я посмотрю конфигурацию "обработчик", я могу указать очередь URI. Если я укажу одну и ту же очередь для всех хостов, все хосты получат сообщение, но я не могу ограничить выполнение только одним сервером.

Если я слушаю из выделенной очереди хоста, только один сервер будет обрабатывать сообщения, но я не знаю, как транслировать сообщения другого типа.

Пожалуйста, помогите мне понять, чего мне не хватает.

PS: если это волнует, моя система обмена сообщениями - rabbitmq.

Для тестирования я создал общую библиотеку классов с этими классами:

public static class ActualProgram
{
    private static readonly CancellationTokenSource g_Shutdown = new CancellationTokenSource();

    private static readonly Random g_Random = new Random();

    public static void ActualMain(int delay, int instanceName)
    {
        Thread.Sleep(delay);
        SetupBus(instanceName);

        Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);

        Console.WriteLine("Press enter at any time to exit");
        Console.ReadLine();
        g_Shutdown.Cancel();

        Bus.Shutdown();
    }

    private static void PublishRandomMessage()
    {
        Bus.Instance.Publish(new Message
        {
            Id = g_Random.Next(),
            Body = "Some message",
            Sender = Assembly.GetEntryAssembly().GetName().Name
        });

        if (!g_Shutdown.IsCancellationRequested)
        {
            Thread.Sleep(g_Random.Next(500, 10000));
            Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);
        }
    }

    private static void SetupBus(int instanceName)
    {
        Bus.Initialize(sbc =>
        {
            sbc.UseRabbitMqRouting();
            sbc.ReceiveFrom("rabbitmq://localhost/simple" + instanceName);
            sbc.Subscribe(subs =>
            {
                subs.Handler<Message>(MessageHandled);
            });
        });
    }

    private static void MessageHandled(Message msg)
    {
        ConsoleColor color = ConsoleColor.Red;
        switch (msg.Sender)
        {
            case "test_app1":
                color = ConsoleColor.Green;
                break;

            case "test_app2":
                color = ConsoleColor.Blue;
                break;

            case "test_app3":
                color = ConsoleColor.Yellow;
                break;
        }
        Console.ForegroundColor = color;
        Console.WriteLine(msg.ToString());
        Console.ResetColor();
    }

    private static void MessageConsumed(Message msg)
    {
        Console.WriteLine(msg.ToString());
    }
}

public class Message
{
    public long Id { get; set; }

    public string Sender { get; set; }

    public string Body { get; set; }

    public override string ToString()
    {
        return string.Format("[{0}] {1} : {2}" + Environment.NewLine, Id, Sender, Body);
    }
}

У меня также есть 3 консольных приложения, которые просто запускают метод ActualMain:

internal class Program
{
    private static void Main(string[] args)
    {
        ActualProgram.ActualMain(0, 1);
    }
}

1 ответ

Решение

То, что вы хотите, известно как Конкурирующие Потребители (ищите, так что вы найдете больше информации) Использование RabbitMQ упрощает жизнь, все, что вам нужно сделать, это указать одно и то же имя очереди для каждого потребителя, которого вы запускаете, сообщение будет обработано только один из них. Вместо того, чтобы генерировать уникальную очередь каждый раз, как вы делаете.

 private static void SetupBus(int instanceName)
{
    Bus.Initialize(sbc =>
    {
        sbc.UseRabbitMqRouting();
        sbc.ReceiveFrom("rabbitmq://localhost/Commands);
        sbc.Subscribe(subs =>
        {
            subs.Handler<Message>(MessageHandled);
        });
    });
}

AFAIK, вам понадобится отдельный процесс для обработчиков команд, в отличие от обработчиков событий. Все обработчики команд получат ReceiveFrom из одной и той же очереди, все обработчики событий получат ReceiveFrom из своей собственной уникальной очереди.

Другая часть головоломки заключается в том, как вы получаете сообщения в автобус. Вы по-прежнему можете использовать команду "Публикация", но если вы неправильно настроили потребителей, вы можете получить несколько выполнений, поскольку сообщение будет отправлено всем потребителям, если вы хотите гарантировать, что сообщение окажется в одной очереди, вы можете использовать "Отправить", а не "Опубликовать".

 Bus.Instance
     .GetEndpoint(new Uri("rabbitmq://localhost/Commands"))
    .Send(new Message
    {
        Id = g_Random.Next(),
        Body = "Some message",
        Sender = Assembly.GetEntryAssembly().GetName().Name
    });
Другие вопросы по тегам