Как в RabbitMQ и PHP вернуть задачу обратно в очередь?

Как мне вернуть сообщение обратно в очередь, если результат обработки меня не устроил. Нашел только информацию о подтверждении сообщения, но думаю, что оно меня не устраивает. Мне нужно, что если в результате обработки я получаю параметр RETRY, сообщение добавляется обратно в очередь. И тогда этот рабочий или другой снова поднимает его и пытается обработать.

Например:

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();

$channel->queue_declare('test', false, false, false, false);

$callback = function($msg) {
    $condition = json_decode($msg->body);

    if (!$condition) {
        # return to the queue
    }
};

$channel->basic_consume('test', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();
?>

3 ответа

Решение

Установить автоматический флаг no_ack в false

queue: очередь, откуда получать сообщения
consumer_tag: идентификатор потребителя
no_local: не получать сообщения, опубликованные этим потребителем.
no_ack: сообщает серверу, будет ли потребитель подтверждать сообщения.
exclusive: Запрос эксклюзивного доступа потребителя, то есть только этот потребитель может получить доступ к очереди
Нет, подождите:
обратный вызов: обратный вызов PHP

$channel->basic_consume('test', '', false, false, false, false, $callback);

Вы должны использовать подтверждения, если ваш процесс не работает, вы можете игнорировать ack

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();

$channel->queue_declare('test', false, false, false, false);

$callback = function($msg) {
    $condition = json_decode($msg->body);

    if (!$condition) {
        // return to the queue 
        $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
    }else{
        // send ack , remove from queue
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    }
};

$channel->basic_consume('test', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();
?>

Решение оказалось проще, чем я думал, оказалось, что задача была не только в RabbitMQ, но и в области действия переменных. Если кто-то заинтересован в решении, здесь:

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();

$channel->queue_declare('test', false, false, false, false);

$callback = function($msg) {
  global $channel;

  $condition = json_decode($msg->body);

  if (!$condition) {
    $msg = new AMQPMessage(json_encode(array(
      'condition' => false
    )));

    $channel->basic_publish($msg, '', 'test');
  }
};

$channel->basic_consume('test', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
  $channel->wait();
}

$channel->close();
$connection->close();
?>

Это может быть немного поздно, но именно так вы должны сделать это с этой версией php-amqplib. "php-amqplib/php-amqplib": "^3.1"Вам нужно установить для параметра no_ack базового метода потребления значение false(по умолчанию), а затем явно указать его в обратном вызове, используя метод nack для объекта AMQPMessage, переданного в обратный вызов.

      <?php
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    
    echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
    
    $connection = new AMQPStreamConnection($AMQP);
    $channel = $connection->channel();
    
    $channel->queue_declare('test', false, false, false, false);
    
    $callback = function($msg) {
        $condition = json_decode($msg->body);
    
        if (!$condition) {
            // message will be added back to the queue
            $msg->nack(true);
        }
    };
    
    $channel->basic_consume('test', '', false, false, false, false, $callback);
    
    while(count($channel->callbacks)) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();
    ?>
Другие вопросы по тегам