Как выполнить блокирующий вызов basic_get с помощью videlalvaro/php-amqplib

Я использую https://github.com/videlalvaro/php-amqplib для выполнения работы rabbitmq:

Я пытаюсь создать блокирующую версию basic_get (или версию basic_consume, которую я могу вызывать многократно и получать только одно сообщение каждый раз), которая будет блокировать до тех пор, пока сообщение не будет готово, а затем вернуть его вместо возврата null, если ни один не в очереди.

Когда я пытаюсь получить единственное сообщение с basic_consume, все смешивается, и я получаю кучу "не готовых", но неиспользованных сообщений. (Если я получаю только одно сообщение таким образом, оно работает каждый раз, если я пытаюсь получить 2 сообщения, оно иногда зависает и работает с другими)

class Foo {
    ...
    private function blockingGet() {
            /*
                queue: Queue from where to get the messages
                consumer_tag: Consumer identifier
                no_local: Don't receive messages published by this consumer.
                no_ack: Tells the server if the consumer will acknowledge the messages.
                exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
                nowait:
                callback: A PHP Callback
             */
            $this->ch->basic_consume($this->queueName, "consumer_".$this->consumerNum++, false, false, false, false, function($msg) {
                    $this->msgCache = json_decode($msg->body);
                    $this->ch->basic_ack($msg->delivery_info['delivery_tag']);
                    $this->ch->basic_cancel($msg->delivery_info['consumer_tag']);
            });
            while (count($this->ch->callbacks)) {
                    $this->ch->wait();
            }
            return $this->msgCache;
    }
}

$q = new Foo();
for ($i = 0; $i < 5; $i++) {
    print $q->blockingGet();
}

1 ответ

Я реализовал нечто похожее на то, что вам нужно, сохранив полученное сообщение в закрытии, переданном параметру обратного вызова $channel->basic_consume(), а затем иметь дело с этим после $channel->wait() позвони, как wait() вернет управление, если получено сообщение (или если задан параметр тайм-аута и достигнут тайм-аут). Попробуйте что-то вроде ниже:

class Foo {
    // ...
    public function __construct() {
        $this->ch->basic_consume($this->queueName, "", false, false, false, false, function($msg) {
                $this->msgCache = json_decode($msg->body);
                $this->ch->basic_ack($msg->delivery_info['delivery_tag']);
        });
    }
    // ...
    private function blockingGet() {
        $this->ch->wait();
        if ($this->msgCache) {
            $msgCache = $this->msgCache;
            $this->msgCache = null;
            return $msgCache;
        }
        return null;
    }
}

$q = new Foo();
for ($i = 0; $i < 5; $i++) {
    print $q->blockingGet();
}
Другие вопросы по тегам