rabbitmq и php - обрабатывают несколько очередей с одним работником (брокером)
У меня 1000 очередей с конкретными именами. поэтому я хочу обработать эти очереди с одним брокером. Является ли это возможным?
имена очередей хранятся в mysql db, поэтому я должен выбрать тему и запустить брокер для каждого. и, конечно, он должен работать асинхронно и должен быть в состоянии передать элемент в очереди незанятому посреднику. Это возможно? или я должен сделать 1000 файлов с конкретными именами очередей в качестве посредников?
Обновление: это картина моих очередей. очереди должны выполняться параллельно, а не последовательно. поэтому пользователи являются производителем, а работник является потребителем, который запускает send_message()
Способ;
1 ответ
Я могу показать вам, как это сделать с библиотекой. Я должен предупредить вас, что нет способа асинхронно потреблять сообщения в одном процессе. Хотя вы можете запустить несколько процессов, которые обслуживают множество очередей. Их можно разделить на группы по важности очереди.
Установите библиотеку транспорта и потребления AMQP:
composer require enqueue/amqp-ext enqueue/enqueue
Создайте сценарий потребления. Я предполагаю, что у вас есть массив имен очередей, уже извлеченных из БД. Они хранятся в $queueNames
вар. В этом примере один и тот же процессор привязан ко всем очередям, но вы, конечно, можете устанавливать разные.
<?php
use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;
// here's the list of queue names which you fetched from DB
$queueNames = ['foo_queue', 'bar_queue', 'baz_queue'];
$factory = new AmqpConnectionFactory('amqp://');
$context = $factory->createContext();
// create queues at RabbitMQ side, you can remove it if you do not need it
foreach ($queueNames as $queueName) {
$queue = $context->createQueue($queueName);
$queue->addFlag(AMQP_DURABLE);
$context->declareQueue($queue);
}
$consumer = new QueueConsumer($context);
foreach ($queueNames as $queueName) {
$consumer->bind($queueName, function(PsrMessage $psrMessage) use ($queueName) {
echo 'Consume the message from queue: '.$queueName;
// your processing logic.
return PsrProcessor::ACK;
});
}
$consumer->consume();
Больше в документе