RabbitMQ- выборочно извлекает сообщения из очереди
Я новичок в RabbitMQ и мне был интересен хороший подход к этой проблеме, над которой я размышляю. Я хочу создать службу, которая подписывается на очередь и извлекает только те сообщения, которые соответствуют определенным критериям; например, если в сообщении указан заголовок конкретной темы.
Я все еще изучаю RabbitMQ и искал советы о том, как подойти к этому. Мои вопросы: как потребитель может извлекать только определенные сообщения из очереди? Как продюсер может установить заголовок темы в сообщении (если это даже правильный термин?)
1 ответ
RabbitMQ идеально подходит для этой ситуации. У вас есть несколько вариантов, чтобы сделать то, что вы хотите. Я предлагаю прочитать документацию, чтобы лучше понять. Я бы предложил вам использовать тему или прямой обмен. Тема более гибкая. Это идет так.
Код производителя соединяется с RabbitMQ Broker и создает Exchange с определенным именем.
Производитель публикует для обмена. Каждое опубликованное сообщение будет опубликовано с ключом маршрутизации.
Потребитель подключается к брокеру RabbitMQ.
Потребитель создает очередь
Потребитель привязывает очередь к обмену, такому же обмену, определенному в производителе. Привязка также включает ключи маршрутизации для каждого сообщения, требуемого для этого конкретного потребителя.
Допустим, вы публиковали сообщения журнала. Ключом маршрутизации может быть что-то вроде "log.info", "log.warn", "log.error". Каждое сообщение, опубликованное производителем, будет иметь соответствующий ключ маршрутизации. Затем у вас будет получатель, который отправит и отправит электронное письмо со всеми сообщениями об ошибках, и еще один, который записывает все сообщения об ошибках в файл. Таким образом, электронная почта определит привязку из своей очереди к обмену с помощью ключа маршрутизации "log.error". Таким образом, хотя обмен получает все сообщения, очередь, определенная для электронной почты, будет содержать только сообщения об ошибках. Filelogger определит новую отдельную очередь, привязанную к тому же обмену, и настроит другой ключ маршрутизации. Вы можете сделать три отдельных привязки для трех разных требуемых ключей маршрутизации или просто использовать подстановочный знак "log.*" Для запроса всех сообщений от обмена, начиная с log.
Это простой пример, который показывает, как вы можете достичь того, что вы хотите сделать.
посмотрите здесь примеры кода, в частности номер учебника № 5.
Рекомендуется оптимизировать обмен / маршрутизацию rabbitmq. Если вы действительно хотите проверить содержание сообщения, следующий код - одно из решений.
Получать сообщения из очереди и проверять, выборочно подтверждать интересующие вас сообщения.
вытащить одно сообщение
GetResponse resp = channel.basicGet(QUEUE_NAME, false);
Подтвердите одно сообщение
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
пример
import com.rabbitmq.client.*;
public class ReceiveLogs {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel();){
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// pull one message and ack manually and exit
GetResponse resp = channel.basicGet(QUEUE_NAME, false);
if( resp != null ){
String message = new String(resp.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
}
System.out.println();
}
}
}
зависимость
compile group: 'com.rabbitmq', name: 'amqp-client', version: '5.8.0'
1. To Retrive Message from RabbitMQ we need to first connect with RabbitMQ server
public WebClient GetRabbitMqConnection(string userName, string password)
{
var client = new WebClient();
client.Credentials = new NetworkCredential(userName, password);
return client;
}
2. Now retrieve message from RabbitMQ using below code.
public string GetRabbitMQMessages(string domainName, string port, string
queueName, string virtualHost, WebClient client, string
methodType)
{
string messageResult = string.Empty;
string strUri = "http://" + domainName + ":" + port +
"/api/queues/" + virtualHost + "/";
var data = client.DownloadString(strUri + queueName + "/");
var queueInfo = JsonConvert.DeserializeObject<QueueInfo>(data);
if (queueInfo == null || queueInfo.messages == 0)
return string.Empty;
if (methodType == "POST")
{
string postbody = "
{\"ackmode\":\"ack_requeue_true\",\"count\":
\"$totalMessageCount\",\"name\":\"${DomainName}\",
\"requeue\":\"false\",\"encoding\":\"auto\",\"vhost\" :
\"${QueueName}\"}";
postbody = postbody.Replace("$totalMessageCount",
queueInfo.messages.ToString()).Replace("${DomainName}",
domainName).Replace("${QueueName}", queueName);
messageResult = client.UploadString(strUri + queueName +
"/get", "POST", postbody);
}
return messageResult;
}
I think this will help you to implement RabbitMQ.
Если вы хотите получить одно сообщение за один раз, добавьте следующие свойства в ваш код получения.
Boolean autoAck = false;
model.BasicConsume(Queuename, autoAck);
model.BasicGet("Queuename", false);
model.BasicGet("Queuename", false);
Добавляя эти свойства RabbitMQ, вы можете извлекать сообщение одно за другим из очереди. Аналогично критериям FIFO