Как задержать? - php-amqplib
Я хотел бы знать, как отложить с Amqpphplib.
Я использовал этот замечательный учебник сценариев кофе:
https://github.com/jamescarr/rabbitmq-scheduled-delivery
но, похоже, он не работает с PHP-amqplib.
Срок действия сообщения истекает, как я хочу, но кажется, что "x-dead-letter-exchange" не работает. Я использовал консоль управления RabbitMQ, и я вижу все создание и удаление очереди в реальном времени. Но мое сообщение отправляется в прямую очередь после истечения срока действия. Я использую версию RabbitMQ 3.2.3, версию PHP-amqplib 2.2.*.
Вот мой код:
Класс подключения:
class Connection
{
/**
* @var $ch
*/
public $ch;
/**
* @var $consumer_tag
*/
public $consumer_tag;
/**
* @var $exchange
*/
public $exchange;
/**
* @var $conn
*/
public $conn;
public function __construct($host, $port, $user, $password, $vhost)
{
$this->exchange = 'immediate';
$this->queue = 'right.now.queue';
$this->consumer_tag = 'consumer';
$this->conn = new AMQPConnection($host, $port, $user, $password, $vhost);
$this->ch = $this->conn->channel();
$this->ch->exchange_declare($this->exchange, 'direct', false, true, false);
$this->ch->queue_declare($this->queue, false, true, false, false, false);
$this->ch->queue_bind($this->queue, $this->exchange);
}
public function createDelayedQueue ($name, $delay_seconds) {
$this->ch->queue_declare($name, false, false, false, true, true, array(
"x-dead-letter-exchange" => array("S", $this->exchange),
"x-message-ttl" => array("I", $delay_seconds*1000),
"x-expires" => array("I", $delay_seconds*1000+1000)
));
}
}
Опубликовать код
$name = 'send.later.'.$ts;
$amqp->createDelayedQueue($name, 2);
$msg = new AMQPMessage($msg_body, array('content_type' => 'text/plain', 'delivery_mode' => 2));
$amqp->ch->basic_publish($msg);
Код потребителя
$amqp = $this->getContainer()->get('amqp_connexion');
$amqp->ch->basic_consume($amqp->queue, $amqp->consumer_tag, false, false, false, false, function ($msg) {
echo $msg->body;
echo "\n--------\n";
});
$output->writeln('Listening '.$amqp->queue.'...');
// Loop as long as the channel has callbacks registered
while (count($amqp->ch->callbacks)) {
$amqp->ch->wait();
}
2 ответа
Я только что написал упрощенную рабочую версию для php:
/////// simplified ///////
// include the AMQPlib Classes || use an autoloader
// queue/exchange names
$queueRightNow = 'right.now.queue';
$exchangeRightNow = 'right.now.exchange';
$queueDelayed5sec = 'delayed.five.seconds.queue';
$exchangeDelayed5sec = 'delayed.five.seconds.exchange';
$delay = 5; // delay in seconds
// create connection
$AMQPConnection = new \PhpAmqpLib\Connection\AMQPConnection('localhost',5672,'guest','guest');
// create a channel
$channel = $AMQPConnection->channel();
// create the right.now.queue, the exchange for that queue and bind them together
$channel->queue_declare($queueRightNow);
$channel->exchange_declare($exchangeRightNow, 'direct');
$channel->queue_bind($queueRightNow, $exchangeRightNow);
// now create the delayed queue and the exchange
$channel->queue_declare(
$queueDelayed5sec,
false,
false,
false,
true,
true,
array(
'x-message-ttl' => array('I', $delay*1000), // delay in seconds to milliseconds
"x-expires" => array("I", $delay*1000+1000),
'x-dead-letter-exchange' => array('S', $exchangeRightNow) // after message expiration in delay queue, move message to the right.now.queue
)
);
$channel->exchange_declare($exchangeDelayed5sec, 'direct');
$channel->queue_bind($queueDelayed5sec, $exchangeDelayed5sec);
// now create a message und publish it to the delayed exchange
$msg = new \PhpAmqpLib\Message\AMQPMessage(
time(),
array(
'delivery_mode' => 2
)
);
$channel->basic_publish($msg,$exchangeDelayed5sec);
// consume the delayed message
$consumeCallback = function(\PhpAmqpLib\Message\AMQPMessage $msg) {
$messagePublishedAt = $msg->body;
echo 'seconds between publishing and consuming: '
. (time()-$messagePublishedAt) . PHP_EOL;
};
$channel->basic_consume($queueRightNow, '', false, true, false, false, $consumeCallback);
// start consuming
while (count($channel->callbacks) > 0) {
$channel->wait();
}
Если вы выберете транспорт на основе взаимодействия с amqp, вам не нужно будет копаться в деталях. Только несколько вещей, которые нужно сделать:
устанавливать enqueue/amqp-lib
(кстати, вы можете использовать другие транспорты, основанные на amqp ext и отличной Bunny lib) транспорт и enqueue/amqp-tools
,
composer require enqueue/amqp-lib enqueue/amqp-tools
Создайте контекст amqp, добавьте стратегию задержки и отправьте отложенные сообщения:
<?php
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
use Enqueue\AmqpBunny\AmqpConnectionFactory;
$context = (new AmqpConnectionFactory('amqp://'))->createContext();
$context->setDelayStrategy(new RabbitMqDlxDelayStrategy())
$queue = $context->createQueue('foo');
$context->declareQueue($queue);
$message = $context->createMessage('Hello world!');
$context->createProducer()
->setDeliveryDelay(5000) // 5 sec
->send($queue, $message)
;
Кстати, это не единственная доступная стратегия. есть один, основанный на плагине задержки RabbitMQ. Это может быть использовано так же.