RabbitMQ- выборочно извлекает сообщения из очереди

Я новичок в RabbitMQ и мне был интересен хороший подход к этой проблеме, над которой я размышляю. Я хочу создать службу, которая подписывается на очередь и извлекает только те сообщения, которые соответствуют определенным критериям; например, если в сообщении указан заголовок конкретной темы.

Я все еще изучаю RabbitMQ и искал советы о том, как подойти к этому. Мои вопросы: как потребитель может извлекать только определенные сообщения из очереди? Как продюсер может установить заголовок темы в сообщении (если это даже правильный термин?)

1 ответ

Решение

RabbitMQ идеально подходит для этой ситуации. У вас есть несколько вариантов, чтобы сделать то, что вы хотите. Я предлагаю прочитать документацию, чтобы лучше понять. Я бы предложил вам использовать тему или прямой обмен. Тема более гибкая. Это идет так.

Код производителя соединяется с RabbitMQ Broker и создает Exchange с определенным именем.

Производитель публикует для обмена. Каждое опубликованное сообщение будет опубликовано с ключом маршрутизации.

Потребитель подключается к брокеру RabbitMQ.

Потребитель создает очередь

Потребитель привязывает очередь к обмену, такому же обмену, определенному в производителе. Привязка также включает ключи маршрутизации для каждого сообщения, требуемого для этого конкретного потребителя.

Допустим, вы публиковали сообщения журнала. Ключом маршрутизации может быть что-то вроде "log.info", "log.warn", "log.error". Каждое сообщение, опубликованное производителем, будет иметь соответствующий ключ маршрутизации. Затем у вас будет получатель, который отправит и отправит электронное письмо со всеми сообщениями об ошибках, и еще один, который записывает все сообщения об ошибках в файл. Таким образом, электронная почта определит привязку из своей очереди к обмену с помощью ключа маршрутизации "log.error". Таким образом, хотя обмен получает все сообщения, очередь, определенная для электронной почты, будет содержать только сообщения об ошибках. Filelogger определит новую отдельную очередь, привязанную к тому же обмену, и настроит другой ключ маршрутизации. Вы можете сделать три отдельных привязки для трех разных требуемых ключей маршрутизации или просто использовать подстановочный знак "log.*" Для запроса всех сообщений от обмена, начиная с log.

Это простой пример, который показывает, как вы можете достичь того, что вы хотите сделать.

посмотрите здесь примеры кода, в частности номер учебника № 5.

Рекомендуется оптимизировать обмен / маршрутизацию rabbitmq. Если вы действительно хотите проверить содержание сообщения, следующий код - одно из решений.

Получать сообщения из очереди и проверять, выборочно подтверждать интересующие вас сообщения.

вытащить одно сообщение

GetResponse resp = channel.basicGet(QUEUE_NAME, false);

Подтвердите одно сообщение

channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);

пример

import com.rabbitmq.client.*;

public class ReceiveLogs {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();){

            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            // pull one message and ack manually and exit
            GetResponse resp = channel.basicGet(QUEUE_NAME, false);
            if( resp != null ){
                String message = new String(resp.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
            }
            System.out.println();
        }
    }
}

зависимость

compile group: 'com.rabbitmq', name: 'amqp-client', version: '5.8.0'
  1. To Retrive Message from RabbitMQ we need to first connect with RabbitMQ server 

    public WebClient GetRabbitMqConnection(string userName, string password)
    {
        var client = new WebClient(); 
        client.Credentials = new NetworkCredential(userName, password);
        return client;
    }

 2. Now retrieve message from RabbitMQ using below code.

      public string GetRabbitMQMessages(string domainName, string port, string 
                    queueName, string virtualHost, WebClient client, string 
                    methodType)
      {
                  string messageResult = string.Empty;
                  string strUri = "http://" + domainName + ":" + port + 
                                  "/api/queues/" + virtualHost + "/";
                  var data = client.DownloadString(strUri + queueName + "/");
                  var queueInfo = JsonConvert.DeserializeObject<QueueInfo>(data);
                  if (queueInfo == null || queueInfo.messages == 0)
                             return string.Empty;
                  if (methodType == "POST")
                  {
                      string postbody = "  
                      {\"ackmode\":\"ack_requeue_true\",\"count\":
                       \"$totalMessageCount\",\"name\":\"${DomainName}\",
                       \"requeue\":\"false\",\"encoding\":\"auto\",\"vhost\" :
                       \"${QueueName}\"}";
                       postbody = postbody.Replace("$totalMessageCount", 
                       queueInfo.messages.ToString()).Replace("${DomainName}", 
                       domainName).Replace("${QueueName}", queueName);
                       messageResult = client.UploadString(strUri + queueName + 
                       "/get", "POST", postbody);
                }
                return messageResult;
    } 


   I think this will help you to implement RabbitMQ.

Если вы хотите получить одно сообщение за один раз, добавьте следующие свойства в ваш код получения.

Boolean autoAck = false;
model.BasicConsume(Queuename, autoAck);
model.BasicGet("Queuename", false);
model.BasicGet("Queuename", false); 

Добавляя эти свойства RabbitMQ, вы можете извлекать сообщение одно за другим из очереди. Аналогично критериям FIFO

Другие вопросы по тегам