Простой пример записи в Java-брокер Qpid с неработающим Qpid Proton
Я новичок в AMQP и пытаюсь написать простое приложение, которое пишет в Qpid Java Broker с API Qpid Proton Messenger. Из коробки Java-брокер Qpid имеет четыре обмена по умолчанию (amq.match, amq.fanout, amq.topic, amq.direct), "порт" AMQP на порту 5672 с povider аутентификации passwordFile. Чтобы исключить безопасность из этого теста, я изменил провайдера аутентификации на Anonymous.
Для написания брокеру я следую этому примеру. В примере не показано, как писать в конкретный обмен или очередь, и я думаю, что моя проблема где-то в этой области. Вот моя уменьшенная версия.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <Windows.h>
#include "proton\message.h"
#include "proton\messenger.h"
int main(int argc, const char* argv[])
{
while (true)
{
pn_message_t * message;
pn_messenger_t * messenger;
pn_data_t * body;
message = pn_message();
messenger = pn_messenger(NULL);
pn_messenger_start(messenger);
printf("set address result: %i\n", pn_message_set_address(message, "amqp://xxx.xxx.xxx.xxx:5672"));
body = pn_message_body(message);
char* msgtext = "howdy";
pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));
pn_messenger_put(messenger, message);
printf("\nSent: %i\n", pn_messenger_send(messenger, 1));
pn_messenger_stop(messenger);
pn_messenger_free(messenger);
pn_message_free(message);
Sleep(10);
}
}
pn_messenger_send возвращает 0 (успех). С клиентской машины я вижу, что клиент отправляет сообщения по проводам. Однако на портале управления брокером он показывает ноль клиентских подключений, а виртуальный хост по умолчанию показывает 0 мс / с (0,00 B/ с) входящего. Я ожидаю, что это покажет байты, поступающие от моего тестового клиента. Если я запускаю клиента с брокером или указываю на неверный порт, клиент потерпит неудачу при вызове pn_messenger_send, поэтому я знаю, что по крайней мере говорю с брокером.
У меня вопрос, куда идут эти сообщения? Как определить тип обмена и очередь в строке подключения для сообщения? Я искал и искал и ничего не нашел. Любые ссылки на документацию или учебники, которые я мог пропустить, приветствуются.
Вот некоторые скриншоты конфигурации брокера для справки.
Спасибо!
1 ответ
Если вы пытаетесь отправить сообщения в определенную очередь, ваш адрес должен иметь вид:
amqp://host:port/test-queue-1
Вы также можете отправить сообщение на биржу:
amqp://host:port/test-exchange-1
В вашем случае вы отправляете свои сообщения без адресата, указанного в адресе. Поэтому сообщения отправляются на прямой обмен по умолчанию. Все очереди привязаны к прямой по умолчанию, с именем очереди в качестве ключа привязки. Но вы также не устанавливаете ключ маршрутизации в своих сообщениях. Они, вероятно, отбрасываются как не маршрутизируемые - я не помню, есть ли в qpid реализация очереди недоставленных писем.
РЕДАКТИРОВАТЬ: неотправляемые сообщения могут быть удалены
Чтобы указать ключ маршрутизации в вашем сообщении, используйте:
pn_message_set_subject(message, "my-routing-key");
Если вы имеете в виду настраиваемый обмен и настройку очереди, вам необходимо предварительно определить привязку обмена и очереди, чтобы обеспечить правильную маршрутизацию сообщений. Я предполагаю, что c-привязки существуют для программного определения и настройки обменов, но я не знаком с этим.
Вы можете создавать обмены, очереди и привязки, используя qpid-config
команда в командной строке или REST API через приложение webadmin, например:
Создайте долговременную очередь, используя qpid-config:
qpid-config add queue test-queue-1 --durable
или API REST
#create a durable queue
curl -X PUT -d '{"durable":true}' http://localhost:8080/rest/queue/<vhostname>/test-queue-1
Создание обмена
qpid-config add exchange direct test-exchange-1 --durable
Создание привязки между очередью и обменом:
qpid-config bind test-exchange-1 test-queue-1 test-queue-binding-key
Любое сообщение, которое вы отправляете на биржу с ключом маршрутизации test-queue-binding-key
в конечном итоге окажется в test-queue-1
,
ref: qpid-config
ref: REST API