Несколько рабочих используют одно и то же сообщение из очереди RabbitMQ
Я использую модуль py-amqp и Python 3.4. Когда я запускаю более 1 слушателя и запускаю одного производителя для публикации сообщений, слушатели берут одно сообщение и начинают обрабатывать его одновременно. Мне не нужно такое поведение, потому что сообщения должны быть записаны в БД только один раз. Поэтому самый быстрый работник пишет сообщение в БД, а все остальные рабочие говорят, что сообщение уже существует.
режиссер:
import json
import amqp
import random
from application.settings import RMQ_PASSWORD, RMQ_USER, RMQ_HOST, RMQ_EXCHANGE
def main():
conn = amqp.Connection(RMQ_HOST, RMQ_USER,
RMQ_PASSWORD, ssl=False)
ch = conn.channel()
ch.exchange_declare(RMQ_EXCHANGE, 'fanout')
req = {"request": {"transaction_number": random.randint(100000, 9999999999)}}
message = json.dumps(req)
msg = amqp.Message(message)
ch.basic_publish(msg, RMQ_EXCHANGE)
ch.close()
conn.close()
if __name__ == '__main__':
for x in range(100):
main()
работник:
from functools import
from pipeline import pipeline, dal
from settings import DB_CONNECTION_STRING, RMQ_EXCHANGE, RMQ_HOST, RMQ_PASSWORD, RMQ_USER
import amqp
DB = dal.DAL(DB_CONNECTION_STRING)
message_processor = pipeline.Pipeline(DB)
def callback(channel, msg):
channel.basic_ack(msg.delivery_tag)
message_processor.process(msg)
if msg.body == 'quit':
channel.basic_cancel(msg.consumer_tag)
def main():
conn = amqp.Connection(RMQ_HOST, RMQ_USER,
RMQ_PASSWORD, ssl=False)
ch = conn.channel()
ch.exchange_declare(RMQ_EXCHANGE, 'fanout')
qname, _, _ = ch.queue_declare()
ch.queue_bind(qname, RMQ_EXCHANGE)
ch.basic_consume(qname, callback=partial(callback, ch))
while ch.callbacks:
ch.wait()
ch.close()
conn.close()
if __name__ == '__main__':
print('Listener starting')
main()
также:
user@RabbitMQ:~$ sudo rabbitmqctl list_bindings
Listing bindings ...
exchange amq.gen--crTjfeSlue6gw0LRwW7pQ queue amq.gen--crTjfeSlue6gw0LRwW7pQ []
exchange amq.gen-1X3vwGF5OKn_gcnofpJKFg queue amq.gen-1X3vwGF5OKn_gcnofpJKFg []
...
exchange amq.gen-yf8ieG1AK9x83Vz4GBj-ZA queue amq.gen-yf8ieG1AK9x83Vz4GBj-ZA []
exchange entryapi.test queue entryapi.test []
entryapi exchange entryapi.test queue []
azaza exchange amq.gen--crTjfeSlue6gw0LRwW7pQ queue []
azaza exchange amq.gen-1X3vwGF5OKn_gcnofpJKFg queue []
...
azaza exchange amq.gen-yf8ieG1AK9x83Vz4GBj-ZA queue []
azaza exchange entryapi.test queue []
...done.
1 ответ
Я думаю, что вы используете неправильный тип настройки для вашего варианта использования. У вас есть издатель, публикующий на бирже, и вы хотите прочитать сообщения и записать их в БД. Вы хотите сделать это со многими потребителями, пишущими в БД, чтобы увеличить пропускную способность. Обмены разветвления реплицируют сообщение, так что несколько очередей и потребителей приведут к множественной записи одних и тех же данных в БД. Вам нужно использовать "Очереди работы". Каждый обмен будет обменом по умолчанию (без типа или прямым обменом со всеми сообщениями, использующими один и тот же ключ маршрутизации). Все сообщения, отправленные на биржу, будут направлены в одну очередь. Каждая очередь будет иметь несколько потребителей. Каждое сообщение будет прочитано из очереди один раз и только одним потребителем из вашей группы потребителей, а затем будет записано только один раз в БД.
Читайте больше здесь http://www.rabbitmq.com/tutorials/tutorial-two-python.html