Распределение потребителей RabbitMQ

Нужна помощь в разработке потребительского распределения rabbit-mq.

Например, существует 100 очередей и 10 потоков для приема сообщений из этой 100 очередей. Каждый поток будет потреблять сообщения из 10 очередей каждый. Вопрос 1: Как динамически назначать потоки очередям? Если потоки работают на разных машинах?

Не более одного потока должно потреблять из очереди (для поддержания порядка обработки сообщения в соответствующей очереди). Вопрос 2: Когда необходимо увеличить потоки потребителя во время работы системы, как это можно сделать?

1 ответ

Есть много сообщений о порядке сообщений (FIFO), у вас нормальная ситуация (один производитель, один потребитель без проблем с сетью), у вас нет проблем. Но как вы можете прочитать здесь

В частности, обратите внимание на условие "если поле для доставки не установлено", что означает, что любое отключение от потребителей может привести к тому, что сообщения, ожидающие подтверждения, будут впоследствии доставлены не в порядке.

Кроме того, например, если вы публикуете сообщение, и во время публикации произошла ошибка, вы должны повторно опубликовать сообщение в правильном порядке. Это означает, что если вам абсолютно необходим порядок сообщений, вы должны реализовать его, например, пометить каждый пакет порядковым номером, а также реализовать подтверждение публикации.

Я думаю, но это мое мнение, что когда вы используете систему сообщений, вам не нужно беспокоиться о порядке сообщений, потому что это должно быть ваше приложение, способное управлять данными.

Сказав это, если мы предположим, что 100 очередей должны обрабатывать сообщения одного и того же типа, вы можете использовать ThreadPoolExecutor и поделиться им со всеми потребителями. Например:

public class ActualConsumer extends DefaultConsumer { 
public ActualConsumer(Channel channel) { 
super(channel); 
} 
@Override 
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws java.io.IOException { 
MyMessage message = new MyMessage(body); 
    mythreadPoolExecutorShared.submit(new MyHandleMessage(message))
} 

}

Таким образом, вы можете сбалансировать сообщения между потоками. Также для threadPool вы можете использовать различные политики, например статическое распределение с фиксированным номером потока или динамическое распределение потока.
Пожалуйста, прочитайте этот пост об изменении размера пула потоков ( можете ли вы динамически изменить размер java.util.concurrent.ThreadPoolExecutor, пока у него все еще есть задачи). Вы можете применить этот шаблон ко всем узлам, таким образом вы можете сбалансировать диспетчерские сообщения и назначить правильный номер темы.

Я надеюсь, что это может быть полезно, я хотел бы быть более подробным, но ваш вопрос немного общий.

Другие вопросы по тегам