rabbitmq и php - обрабатывают несколько очередей с одним работником (брокером)

У меня 1000 очередей с конкретными именами. поэтому я хочу обработать эти очереди с одним брокером. Является ли это возможным?

имена очередей хранятся в mysql db, поэтому я должен выбрать тему и запустить брокер для каждого. и, конечно, он должен работать асинхронно и должен быть в состоянии передать элемент в очереди незанятому посреднику. Это возможно? или я должен сделать 1000 файлов с конкретными именами очередей в качестве посредников?

Обновление: это картина моих очередей. очереди должны выполняться параллельно, а не последовательно. поэтому пользователи являются производителем, а работник является потребителем, который запускает send_message() Способ;

1 ответ

Решение

Я могу показать вам, как это сделать с библиотекой. Я должен предупредить вас, что нет способа асинхронно потреблять сообщения в одном процессе. Хотя вы можете запустить несколько процессов, которые обслуживают множество очередей. Их можно разделить на группы по важности очереди.

Установите библиотеку транспорта и потребления AMQP:

composer require enqueue/amqp-ext enqueue/enqueue

Создайте сценарий потребления. Я предполагаю, что у вас есть массив имен очередей, уже извлеченных из БД. Они хранятся в $queueNames вар. В этом примере один и тот же процессор привязан ко всем очередям, но вы, конечно, можете устанавливать разные.

<?php

use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;

// here's the list of queue names which you fetched from DB
$queueNames = ['foo_queue', 'bar_queue', 'baz_queue'];

$factory = new AmqpConnectionFactory('amqp://');

$context = $factory->createContext();

// create queues at RabbitMQ side, you can remove it if you do not need it
foreach ($queueNames as $queueName) {
    $queue = $context->createQueue($queueName);
    $queue->addFlag(AMQP_DURABLE);

    $context->declareQueue($queue);
}

$consumer = new QueueConsumer($context);

foreach ($queueNames as $queueName) {
    $consumer->bind($queueName, function(PsrMessage $psrMessage) use ($queueName) {
        echo 'Consume the message from queue: '.$queueName;

        // your processing logic.

        return PsrProcessor::ACK;
    });
}

$consumer->consume();

Больше в документе

Другие вопросы по тегам