Eclipse-hono MQTT адаптер: не может обработать сообщение

Я пытаюсь настроить hono на моей локальной машине без контейнерной платформы. Мне удалось запустить следующие службы и предварительные условия:

  • AMQP брокер (RabbitMQ)
  • InfluxDb
  • Hono MQTT адаптер
  • Hono auth service
  • Реестр устройств Hono

Когда MQTT-адаптер запускается, я получаю следующую запись:

14:37:48.257 [vert.x-eventloop-thread-0] INFO  o.e.h.c.RequestResponseClientConfigProperties - loading credentials for [127.0.0.1] from [/media/data/dev/hono-config/mqtt-adapter.credentials]
14:37:48.257 [main] DEBUG o.e.h.a.m.i.Application$$EnhancerBySpringCGLIB$$e74ec218 - Waiting 20 seconds for application to start up
14:37:48.258 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connecting to AMQP 1.0 container [amqp://127.0.0.1:5672]
14:37:48.472 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - starting attempt [#1] to connect to server [127.0.0.1:5672]
14:37:48.472 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connecting to AMQP 1.0 container [amqp://127.0.0.1:5672]
14:37:48.473 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - starting attempt [#1] to connect to server [127.0.0.1:5672]
14:37:48.473 [vert.x-eventloop-thread-0] INFO  o.e.h.c.RequestResponseClientConfigProperties - loading credentials for [127.0.0.1] from [/media/data/dev/hono-config/mqtt-adapter.credentials]
14:37:48.473 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connecting to AMQP 1.0 container [amqp://127.0.0.1:5672]
14:37:48.473 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - starting attempt [#1] to connect to server [127.0.0.1:5672]
14:37:48.473 [vert.x-eventloop-thread-0] INFO  o.e.h.c.RequestResponseClientConfigProperties - loading credentials for [127.0.0.1] from [/media/data/dev/hono-config/mqtt-adapter.credentials]
14:37:48.474 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connecting to AMQP 1.0 container [amqp://127.0.0.1:5672]
14:37:48.474 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - starting attempt [#1] to connect to server [127.0.0.1:5672]
14:37:48.474 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connecting to AMQP 1.0 container [amqp://127.0.0.1:5672]
14:37:48.474 [vert.x-eventloop-thread-0] INFO  o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - limiting size of inbound message payload to 8096 bytes
14:37:48.474 [vert.x-eventloop-thread-0] WARN  o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - authentication of devices turned off
14:37:48.474 [vert.x-eventloop-thread-0] INFO  o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - Vertx native support: false
14:37:48.476 [vert.x-eventloop-thread-0] INFO  o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - Server uses secure standard port 8883
14:37:48.479 [vert.x-eventloop-thread-0] DEBUG o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - OpenSSL [available: false, supports KeyManagerFactory: false]
14:37:48.479 [vert.x-eventloop-thread-0] INFO  o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - using JDK's default SSL engine
14:37:48.480 [vert.x-eventloop-thread-0] INFO  o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - enabling secure protocol [TLSv1.2]
14:37:48.530 [vert.x-eventloop-thread-0] INFO  o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - MQTT server running on 0.0.0.0:8883
14:37:48.530 [vert.x-eventloop-thread-0] INFO  o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - MQTT server running on 0.0.0.0:11883
14:37:48.533 [main] INFO  o.e.h.adapter.mqtt.impl.Application - Started Application in 2.542 seconds (JVM running for 3.01)
14:37:48.608 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connected to AMQP 1.0 container [amqp://127.0.0.1:5672], opening connection ...
14:37:48.611 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connected to AMQP 1.0 container [amqp://127.0.0.1:5672], opening connection ...
14:37:48.612 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connected to AMQP 1.0 container [amqp://127.0.0.1:5672], opening connection ...
14:37:48.612 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connected to AMQP 1.0 container [amqp://127.0.0.1:5672], opening connection ...
14:37:48.614 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connection to container [rabbit@bob-HP-ZBook-15-G4] at [amqp://127.0.0.1:5672] open
14:37:48.615 [vert.x-eventloop-thread-0] INFO  o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connected to Tenant service
14:37:48.616 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connection to container [rabbit@bob-HP-ZBook-15-G4] at [amqp://127.0.0.1:5672] open
14:37:48.616 [vert.x-eventloop-thread-0] INFO  o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connected to Messaging
14:37:48.616 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connection to container [rabbit@bob-HP-ZBook-15-G4] at [amqp://127.0.0.1:5672] open
14:37:48.616 [vert.x-eventloop-thread-0] INFO  o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connected to Device Registration service
14:37:48.617 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connection to container [rabbit@bob-HP-ZBook-15-G4] at [amqp://127.0.0.1:5672] open
14:37:48.617 [vert.x-eventloop-thread-0] INFO  o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connected to Credentials service
14:37:48.617 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connected to AMQP 1.0 container [amqp://127.0.0.1:5672], opening connection ...
14:37:48.618 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connection to container [rabbit@bob-HP-ZBook-15-G4] at [amqp://127.0.0.1:5672] open
14:37:48.618 [vert.x-eventloop-thread-0] INFO  o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connected to Command and Control service

При отправке сообщения (mosquitto_pub -p 11883 -t telemetry/DEFAULT_TENANT/4711 -m '{"temp": 5}') на адаптер MQTT (пример взят из https://www.eclipse.org/hono/user-guide/mqtt-adapter/), я получаю следующую регистрацию в сервисе MQTT-адаптера:

14:38:05.208 [vert.x-eventloop-thread-0] DEBUG o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connection request from client [client-id: mosqpub|21426-bob-HP-ZB]
14:38:05.213 [vert.x-eventloop-thread-0] DEBUG o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - unauthenticated device [clientId: mosqpub|21426-bob-HP-ZB] connected
14:38:05.215 [vert.x-eventloop-thread-0] INFO  o.e.h.s.m.LoggingConnectionEventProducer -    Connected - ID: mosqpub|21426-bob-HP-ZB, Protocol Adapter: hono-mqtt, Device: null, Data: null
14:38:05.222 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - creating new message sender for telemetry/DEFAULT_TENANT
14:38:05.236 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - creating new client [target: registration/DEFAULT_TENANT]
14:38:05.236 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.RegistrationClientImpl - creating new registration client for [DEFAULT_TENANT]
14:38:05.246 [vert.x-eventloop-thread-0] INFO  o.e.h.c.i.AbstractRequestResponseClient - enabling caching of responses from registration/DEFAULT_TENANT
14:38:05.253 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - creating new client [target: tenant]
14:38:05.254 [vert.x-eventloop-thread-0] DEBUG o.e.h.client.impl.TenantClientImpl - creating new tenant client
14:38:05.254 [vert.x-eventloop-thread-0] INFO  o.e.h.c.i.AbstractRequestResponseClient - enabling caching of responses from tenant
14:38:05.258 [vert.x-eventloop-thread-0] INFO  o.e.hono.client.impl.HonoClientImpl - remote server [127.0.0.1:5672] closed connection with error condition: The connections default session closed unexpectedly: : io.vertx.core.impl.NoStackTraceThrowable: Error{condition=amqp:invalid-field, description='Attach rejected: {unknown_destination,"telemetry/DEFAULT_TENANT"}', info=null}
14:38:05.260 [vert.x-eventloop-thread-0] DEBUG o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - cannot process message [endpoint: telemetry] from device [tenantId: DEFAULT_TENANT, deviceId: 4711]
org.eclipse.hono.client.ServerErrorException: no connection to service
    at org.eclipse.hono.client.impl.HonoClientImpl.lambda$getOrCreateSender$16(HonoClientImpl.java:619)
    at org.eclipse.hono.client.impl.HonoClientImpl.failAllCreationRequests(HonoClientImpl.java:463)
    at org.eclipse.hono.client.impl.HonoClientImpl.clearState(HonoClientImpl.java:455)
    at org.eclipse.hono.client.impl.HonoClientImpl.handleConnectionLoss(HonoClientImpl.java:437)
    at org.eclipse.hono.client.impl.HonoClientImpl.onRemoteClose(HonoClientImpl.java:417)
    at org.eclipse.hono.client.impl.HonoClientImpl.lambda$connect$2(HonoClientImpl.java:372)
    at io.vertx.proton.impl.ProtonConnectionImpl.lambda$getDefaultSession$6(ProtonConnectionImpl.java:256)
    at io.vertx.proton.impl.ProtonSessionImpl.fireRemoteClose(ProtonSessionImpl.java:270)
    at io.vertx.proton.impl.ProtonTransport.handleSocketBuffer(ProtonTransport.java:125)
    at io.vertx.core.net.impl.NetSocketImpl$DataMessageHandler.handle(NetSocketImpl.java:384)
    at io.vertx.core.net.impl.NetSocketImpl.handleMessageReceived(NetSocketImpl.java:351)
    at io.vertx.core.net.impl.NetClientImpl$1.handleMessage(NetClientImpl.java:242)
    at io.vertx.core.net.impl.NetClientImpl$1.handleMessage(NetClientImpl.java:239)
    at io.vertx.core.net.impl.VertxHandler.lambda$channelRead$1(VertxHandler.java:146)
    at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:337)
    at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:195)
    at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:144)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:141)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:844)
14:38:05.261 [vert.x-eventloop-thread-0] DEBUG o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - discarding message [topic: telemetry/DEFAULT_TENANT/4711] from device: no connection to service

К какому сервису он пытается подключиться? Небольшой вопрос не по теме: в docker_swarm.sh я также заметил службу сообщений, для чего она используется? На https://www.eclipse.org/hono/getting-started/ это не упоминается.

Заранее спасибо!

1 ответ

Решение

Hono (официально) не поддерживает RabbitMQ как сеть обмена сообщениями AMQP 1.0. Однако, благодаря поддержке Rabbit для AMQP 1.0, вы все равно можете заставить его работать. Тем не менее, похоже, что адаптер MQTT не может открыть ссылку отправителя на Rabbit, которая, кажется, отклоняет запрос адаптера открыть отправителя по целевому адресу telemetry/DEFAULT_TENANT, Это может быть связано с происхождением Rabbit как брокера сообщений, который требует определения очередей и / или тем, прежде чем их можно будет использовать во время выполнения.

Я не эксперт в RabbitMQ, но я думаю, что вы должны выяснить,

  • как настроить RabbitMQ так, чтобы он динамически создавал очереди / темы во время выполнения, когда одноранговый узел открывает соответствующую ссылку, или
  • определить соответствующие очереди в RabbitMQ заранее, например, путем добавления их в конфигурационные файлы Rabbit.

Если не обязательно использовать RabbitMQ, я бы порекомендовал вместо этого использовать Qpid Dispatch Router вместе с Apache MQ Artemis. Вы получите лучшую поддержку от команды разработчиков Hono, потому что это официальная реализация AMQP Messaging Network, используемая для разработки.

Что касается вашего примечания: до Hono 0.7 все адаптеры протоколов должны были подключаться к службе обмена сообщениями, которая обеспечивала центральную точку в потоке сообщений, где Hono могла проверять источник сообщения или события телеметрии, прежде чем направить его вниз в сеть обмена сообщениями AMQP. (и, следовательно, потребляющие приложения).

Для этого адаптер протокола должен был включать в каждое сообщение веб-токен JSON, полученный им от службы регистрации устройств для подключенного устройства и подтверждающий статус регистрации устройства. Затем Hono Messaging проверит подпись JWT, гарантируя, что сообщение отправляется с существующего (зарегистрированного) устройства клиента, для которого фактически включен адаптер протокола. Идея заключалась в том, что реестр устройств не будет выпускать JWT для устройства, для которого адаптер не был авторизован. Это особенно полезно в сценариях, где настраиваемый (предоставленный третьей стороной) адаптер протокола должен быть интегрирован с установкой Hono (например, для конкретного арендатора), но этот адаптер протокола должен быть защищен от отправки сообщений от имени или произвольных устройств, которые не принадлежат арендатору, для которого был авторизован адаптер.

В более поздних версиях мы отказались от этого компонента в пользу авторизации адаптеров протоколов в сети обмена сообщениями AMQP.

логирование

Все сервисы Hono регистрируют информацию как стандартную. Уровень журнала может быть установлен с помощью spring.profiles.active Системное свойство Java. Если не установлено, службы будут регистрироваться на уровне INFO, просто предоставляя очень ограниченную информацию о событиях жизненного цикла, через которые проходит компонент. Чтобы включить более полное ведение журнала отладки, установите системное свойство следующим образом -Dspring.profiles.active=dev при запуске JVM.

Другие вопросы по тегам