Чтение сообщений из RabbitMQ с помощью php-amqlib не работает

Я работаю над взаимодействием клиент-сервер с PHP и RabbitMQ, используя php-amqplib.

У меня есть сценарий продюсера, который выглядит нормально, но мой потребитель ничего не получает.

Я проверил записи в очереди с sudo rabbitmqctl list_queues и после каждого запуска продюсера счетчик увеличивается.

Мой потребитель запускается без каких-либо ошибок PHP и выглядит как ожидание сообщений. Что не выглядит хорошо, он запускает скрипт обратного вызова один раз с пустым входящим сообщением один раз при запуске - и затем ничего не делает.

php consumer.php 
string(47) " [*] Waiting for messages. To exit press CTRL+C"
string(1) "
"
string(10) "Received: "

Вот мои коды:

producer.php

public function sendDataToRabbitMQ()
{
    $id = $_POST['id'];
    $ipAddress = $_POST['ip'];
    $date = date("Y-m-d h:i:s");
    $status = false;

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); //host: RABBITMQ_HOST
    $channel = $connection->channel();

    $channel->queue_declare('first_queue', false, true, false, false);

    if(is_array($argv)) {
        $data = implode(' ', array_slice($argv, 1));
    }

    if (empty($data)) {
        $data = "$ipAddress,$id";
    }

    $msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));

    $channel->basic_publish($msg, '', 'first_queue');

    echo " [x] Sent data:", "\n", $data, "\n";

    $channel->close();
    $connection->close();

    return $result;
}

(изменил константу имени хоста на localhost для тестирования)

consumer.php

<?php

require_once '../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

class Consumer
{
    private $token;

    private function getQueue()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();

        $channel->queue_declare('ban_queue', false, true, false, false);

        var_dump(' [*] Waiting for messages. To exit press CTRL+C', "\n");

        $callback = function($msg) {

            $message = explode(',', $msg->body);

            var_dump('Received: '.$message[0]);

            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };

        $channel->basic_qos(null, 1, null);
        $channel->basic_consume('first_queue', '', false, false, false, false, $callback);
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }

    public function processResult()
    {
        $this->getQueue();
    }

}

$consumer = new Consumer();
$consumer->processResult();

Почему это не работает? Я нашел учебники и документацию по rabbitmq/php-amqplib довольно бесполезными, и я уже почти пол дня занят этим вопросом. Любая помощь приветствуется.

ОБНОВЛЕНИЕ 1

Я также проверил этот QA с этого сайта, и мой код соответствует этому.

1 ответ

Решение

После некоторого времени исследований и испытаний я нашел решение для вышеуказанной проблемы:

Я изменился

$channel->basic_consume('first_queue', '', false, false, false, false, $callback);

в

$channel->basic_consume('first_queue', '', false, true, false, false, $callback);

и это сделало трюк.

Другие вопросы по тегам