Как выполнить блокирующий вызов basic_get с помощью videlalvaro/php-amqplib
Я использую https://github.com/videlalvaro/php-amqplib для выполнения работы rabbitmq:
Я пытаюсь создать блокирующую версию basic_get (или версию basic_consume, которую я могу вызывать многократно и получать только одно сообщение каждый раз), которая будет блокировать до тех пор, пока сообщение не будет готово, а затем вернуть его вместо возврата null, если ни один не в очереди.
Когда я пытаюсь получить единственное сообщение с basic_consume, все смешивается, и я получаю кучу "не готовых", но неиспользованных сообщений. (Если я получаю только одно сообщение таким образом, оно работает каждый раз, если я пытаюсь получить 2 сообщения, оно иногда зависает и работает с другими)
class Foo {
...
private function blockingGet() {
/*
queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: Tells the server if the consumer will acknowledge the messages.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback
*/
$this->ch->basic_consume($this->queueName, "consumer_".$this->consumerNum++, false, false, false, false, function($msg) {
$this->msgCache = json_decode($msg->body);
$this->ch->basic_ack($msg->delivery_info['delivery_tag']);
$this->ch->basic_cancel($msg->delivery_info['consumer_tag']);
});
while (count($this->ch->callbacks)) {
$this->ch->wait();
}
return $this->msgCache;
}
}
$q = new Foo();
for ($i = 0; $i < 5; $i++) {
print $q->blockingGet();
}
1 ответ
Я реализовал нечто похожее на то, что вам нужно, сохранив полученное сообщение в закрытии, переданном параметру обратного вызова $channel->basic_consume()
, а затем иметь дело с этим после $channel->wait()
позвони, как wait()
вернет управление, если получено сообщение (или если задан параметр тайм-аута и достигнут тайм-аут). Попробуйте что-то вроде ниже:
class Foo {
// ...
public function __construct() {
$this->ch->basic_consume($this->queueName, "", false, false, false, false, function($msg) {
$this->msgCache = json_decode($msg->body);
$this->ch->basic_ack($msg->delivery_info['delivery_tag']);
});
}
// ...
private function blockingGet() {
$this->ch->wait();
if ($this->msgCache) {
$msgCache = $this->msgCache;
$this->msgCache = null;
return $msgCache;
}
return null;
}
}
$q = new Foo();
for ($i = 0; $i < 5; $i++) {
print $q->blockingGet();
}