Eclipse-hono MQTT 适配器:无法处理消息
Eclipse-hono MQTT adapter: cannot process message
我正在尝试在没有容器平台的情况下在我的本地计算机上设置 hono。
我已成功 运行 以下服务和先决条件:
- AMQP 代理 (RabbitMQ)
- InfluxDb
- Hono MQTT 适配器
- Hono 身份验证服务
- Hono 设备注册表
当 MQTT 适配器启动时,我得到以下日志记录:
14:37:48.257 [vert.x-eventloop-thread-0] INFO o.e.h.c.RequestResponseClientConfigProperties - loading credentials for [127.0.0.1] from [/media/data/dev/hono-config/mqtt-adapter.credentials]
14:37:48.257 [main] DEBUG o.e.h.a.m.i.Application$$EnhancerBySpringCGLIB$$e74ec218 - Waiting 20 seconds for application to start up
14:37:48.258 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connecting to AMQP 1.0 container [amqp://127.0.0.1:5672]
14:37:48.472 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - starting attempt [#1] to connect to server [127.0.0.1:5672]
14:37:48.472 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connecting to AMQP 1.0 container [amqp://127.0.0.1:5672]
14:37:48.473 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - starting attempt [#1] to connect to server [127.0.0.1:5672]
14:37:48.473 [vert.x-eventloop-thread-0] INFO o.e.h.c.RequestResponseClientConfigProperties - loading credentials for [127.0.0.1] from [/media/data/dev/hono-config/mqtt-adapter.credentials]
14:37:48.473 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connecting to AMQP 1.0 container [amqp://127.0.0.1:5672]
14:37:48.473 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - starting attempt [#1] to connect to server [127.0.0.1:5672]
14:37:48.473 [vert.x-eventloop-thread-0] INFO o.e.h.c.RequestResponseClientConfigProperties - loading credentials for [127.0.0.1] from [/media/data/dev/hono-config/mqtt-adapter.credentials]
14:37:48.474 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connecting to AMQP 1.0 container [amqp://127.0.0.1:5672]
14:37:48.474 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - starting attempt [#1] to connect to server [127.0.0.1:5672]
14:37:48.474 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connecting to AMQP 1.0 container [amqp://127.0.0.1:5672]
14:37:48.474 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - limiting size of inbound message payload to 8096 bytes
14:37:48.474 [vert.x-eventloop-thread-0] WARN o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - authentication of devices turned off
14:37:48.474 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - Vertx native support: false
14:37:48.476 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - Server uses secure standard port 8883
14:37:48.479 [vert.x-eventloop-thread-0] DEBUG o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - OpenSSL [available: false, supports KeyManagerFactory: false]
14:37:48.479 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - using JDK's default SSL engine
14:37:48.480 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - enabling secure protocol [TLSv1.2]
14:37:48.530 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - MQTT server running on 0.0.0.0:8883
14:37:48.530 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - MQTT server running on 0.0.0.0:11883
14:37:48.533 [main] INFO o.e.h.adapter.mqtt.impl.Application - Started Application in 2.542 seconds (JVM running for 3.01)
14:37:48.608 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connected to AMQP 1.0 container [amqp://127.0.0.1:5672], opening connection ...
14:37:48.611 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connected to AMQP 1.0 container [amqp://127.0.0.1:5672], opening connection ...
14:37:48.612 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connected to AMQP 1.0 container [amqp://127.0.0.1:5672], opening connection ...
14:37:48.612 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connected to AMQP 1.0 container [amqp://127.0.0.1:5672], opening connection ...
14:37:48.614 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connection to container [rabbit@bob-HP-ZBook-15-G4] at [amqp://127.0.0.1:5672] open
14:37:48.615 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connected to Tenant service
14:37:48.616 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connection to container [rabbit@bob-HP-ZBook-15-G4] at [amqp://127.0.0.1:5672] open
14:37:48.616 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connected to Messaging
14:37:48.616 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connection to container [rabbit@bob-HP-ZBook-15-G4] at [amqp://127.0.0.1:5672] open
14:37:48.616 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connected to Device Registration service
14:37:48.617 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connection to container [rabbit@bob-HP-ZBook-15-G4] at [amqp://127.0.0.1:5672] open
14:37:48.617 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connected to Credentials service
14:37:48.617 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connected to AMQP 1.0 container [amqp://127.0.0.1:5672], opening connection ...
14:37:48.618 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connection to container [rabbit@bob-HP-ZBook-15-G4] at [amqp://127.0.0.1:5672] open
14:37:48.618 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connected to Command and Control service
向 MQTT 适配器发送消息 (mosquitto_pub -p 11883 -t telemetry/DEFAULT_TENANT/4711 -m '{"temp": 5}' 时(示例取自 https://www.eclipse.org/hono/user-guide/mqtt-adapter/),我正在登录 MQTT 适配器服务:
14:38:05.208 [vert.x-eventloop-thread-0] DEBUG o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connection request from client [client-id: mosqpub|21426-bob-HP-ZB]
14:38:05.213 [vert.x-eventloop-thread-0] DEBUG o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - unauthenticated device [clientId: mosqpub|21426-bob-HP-ZB] connected
14:38:05.215 [vert.x-eventloop-thread-0] INFO o.e.h.s.m.LoggingConnectionEventProducer - Connected - ID: mosqpub|21426-bob-HP-ZB, Protocol Adapter: hono-mqtt, Device: null, Data: null
14:38:05.222 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - creating new message sender for telemetry/DEFAULT_TENANT
14:38:05.236 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - creating new client [target: registration/DEFAULT_TENANT]
14:38:05.236 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.RegistrationClientImpl - creating new registration client for [DEFAULT_TENANT]
14:38:05.246 [vert.x-eventloop-thread-0] INFO o.e.h.c.i.AbstractRequestResponseClient - enabling caching of responses from registration/DEFAULT_TENANT
14:38:05.253 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - creating new client [target: tenant]
14:38:05.254 [vert.x-eventloop-thread-0] DEBUG o.e.h.client.impl.TenantClientImpl - creating new tenant client
14:38:05.254 [vert.x-eventloop-thread-0] INFO o.e.h.c.i.AbstractRequestResponseClient - enabling caching of responses from tenant
14:38:05.258 [vert.x-eventloop-thread-0] INFO o.e.hono.client.impl.HonoClientImpl - remote server [127.0.0.1:5672] closed connection with error condition: The connections default session closed unexpectedly: : io.vertx.core.impl.NoStackTraceThrowable: Error{condition=amqp:invalid-field, description='Attach rejected: {unknown_destination,"telemetry/DEFAULT_TENANT"}', info=null}
14:38:05.260 [vert.x-eventloop-thread-0] DEBUG o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - cannot process message [endpoint: telemetry] from device [tenantId: DEFAULT_TENANT, deviceId: 4711]
org.eclipse.hono.client.ServerErrorException: no connection to service
at org.eclipse.hono.client.impl.HonoClientImpl.lambda$getOrCreateSender(HonoClientImpl.java:619)
at org.eclipse.hono.client.impl.HonoClientImpl.failAllCreationRequests(HonoClientImpl.java:463)
at org.eclipse.hono.client.impl.HonoClientImpl.clearState(HonoClientImpl.java:455)
at org.eclipse.hono.client.impl.HonoClientImpl.handleConnectionLoss(HonoClientImpl.java:437)
at org.eclipse.hono.client.impl.HonoClientImpl.onRemoteClose(HonoClientImpl.java:417)
at org.eclipse.hono.client.impl.HonoClientImpl.lambda$connect(HonoClientImpl.java:372)
at io.vertx.proton.impl.ProtonConnectionImpl.lambda$getDefaultSession(ProtonConnectionImpl.java:256)
at io.vertx.proton.impl.ProtonSessionImpl.fireRemoteClose(ProtonSessionImpl.java:270)
at io.vertx.proton.impl.ProtonTransport.handleSocketBuffer(ProtonTransport.java:125)
at io.vertx.core.net.impl.NetSocketImpl$DataMessageHandler.handle(NetSocketImpl.java:384)
at io.vertx.core.net.impl.NetSocketImpl.handleMessageReceived(NetSocketImpl.java:351)
at io.vertx.core.net.impl.NetClientImpl.handleMessage(NetClientImpl.java:242)
at io.vertx.core.net.impl.NetClientImpl.handleMessage(NetClientImpl.java:239)
at io.vertx.core.net.impl.VertxHandler.lambda$channelRead(VertxHandler.java:146)
at io.vertx.core.impl.ContextImpl.lambda$wrapTask(ContextImpl.java:337)
at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:195)
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:144)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:141)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:844)
14:38:05.261 [vert.x-eventloop-thread-0] DEBUG o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - discarding message [topic: telemetry/DEFAULT_TENANT/4711] from device: no connection to service
它试图连接到哪个服务?
一个题外话的小问题:在docker_swarm.sh我也注意到了消息服务,是做什么用的?在 https://www.eclipse.org/hono/getting-started/#starting-hono 上没有提及。
提前致谢!
Hono doesn't (officially) support RabbitMQ as the AMQP 1.0 Messaging Network. However, based on Rabbit's support for AMQP 1.0, you may still get it to work. That said, it looks like the MQTT adapter cannot open a sender link to Rabbit which seems to reject the adapter's request to open a sender on target address telemetry/DEFAULT_TENANT
. This might be due to Rabbit's origins as a message broker which requires to define queues and/or topics before they can be used during runtime.
I am no expert in RabbitMQ but I think you should figure out
- how to configure RabbitMQ so that it dynamically creates queues/topics during runtime when a corresponding link is opened by a peer, or
- define the corresponding queues in RabbitMQ upfront, e.g. by means of adding them to Rabbit's configuration file(s).
If it is not imperative to use RabbitMQ, I would actually recommend to use Qpid Dispatch Router together with Apache MQ Artemis instead. You will get better support from the Hono dev team for it, because that's the official implementation of the AMQP Messaging Network being used for development.
Regarding your side note: Before Hono 0.7, all Protocol Adapters had to connect to the Messaging service, which provided a central point in the message flow where Hono could verify the origin of a telemetry message or event before forwarding it downstream to the AMQP Messaging Network (and hence the consuming applications).
For this purpose, a protocol adapter needed to include in each message a JSON Web Token which it obtained from the Device Registration service for a connected device and which asserted the device's registration status. Hono Messaging would then verify the JWT's signature, thus ensuring that the message origins from an existing (registered) device of a tenant that the protocol adapter is actually enabled for. The idea was that the Device Registry would not issue a JWT for a device that the adapter was not authorized for. This is particularly useful for scenarios where a custom (third party provided) protocol adapter should be integrated with a Hono installation (e.g. for a particular tenant) but that protocol adapter should be prevented from sending messages downstream on behalf or arbitrary devices that do not belong to the tenant that the adapter has been authorized for.
In more recent versions we dropped this component in favor of authorizing the protocol adapters at the AMQP Messaging Network.
Logging
All Hono services log information to standard out. The log level can be set by means of the spring.profiles.active
Java system 属性. If not set, the services will log at INFO level, just providing very limited information about the lifecycle events the component is going through.
In order to turn on more exhaustive debug logging, set the system 属性 like this -Dspring.profiles.active=dev
when starting the JVM.
我正在尝试在没有容器平台的情况下在我的本地计算机上设置 hono。 我已成功 运行 以下服务和先决条件:
- AMQP 代理 (RabbitMQ)
- InfluxDb
- Hono MQTT 适配器
- Hono 身份验证服务
- Hono 设备注册表
当 MQTT 适配器启动时,我得到以下日志记录:
14:37:48.257 [vert.x-eventloop-thread-0] INFO o.e.h.c.RequestResponseClientConfigProperties - loading credentials for [127.0.0.1] from [/media/data/dev/hono-config/mqtt-adapter.credentials]
14:37:48.257 [main] DEBUG o.e.h.a.m.i.Application$$EnhancerBySpringCGLIB$$e74ec218 - Waiting 20 seconds for application to start up
14:37:48.258 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connecting to AMQP 1.0 container [amqp://127.0.0.1:5672]
14:37:48.472 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - starting attempt [#1] to connect to server [127.0.0.1:5672]
14:37:48.472 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connecting to AMQP 1.0 container [amqp://127.0.0.1:5672]
14:37:48.473 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - starting attempt [#1] to connect to server [127.0.0.1:5672]
14:37:48.473 [vert.x-eventloop-thread-0] INFO o.e.h.c.RequestResponseClientConfigProperties - loading credentials for [127.0.0.1] from [/media/data/dev/hono-config/mqtt-adapter.credentials]
14:37:48.473 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connecting to AMQP 1.0 container [amqp://127.0.0.1:5672]
14:37:48.473 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - starting attempt [#1] to connect to server [127.0.0.1:5672]
14:37:48.473 [vert.x-eventloop-thread-0] INFO o.e.h.c.RequestResponseClientConfigProperties - loading credentials for [127.0.0.1] from [/media/data/dev/hono-config/mqtt-adapter.credentials]
14:37:48.474 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connecting to AMQP 1.0 container [amqp://127.0.0.1:5672]
14:37:48.474 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - starting attempt [#1] to connect to server [127.0.0.1:5672]
14:37:48.474 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connecting to AMQP 1.0 container [amqp://127.0.0.1:5672]
14:37:48.474 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - limiting size of inbound message payload to 8096 bytes
14:37:48.474 [vert.x-eventloop-thread-0] WARN o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - authentication of devices turned off
14:37:48.474 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - Vertx native support: false
14:37:48.476 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - Server uses secure standard port 8883
14:37:48.479 [vert.x-eventloop-thread-0] DEBUG o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - OpenSSL [available: false, supports KeyManagerFactory: false]
14:37:48.479 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - using JDK's default SSL engine
14:37:48.480 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - enabling secure protocol [TLSv1.2]
14:37:48.530 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - MQTT server running on 0.0.0.0:8883
14:37:48.530 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - MQTT server running on 0.0.0.0:11883
14:37:48.533 [main] INFO o.e.h.adapter.mqtt.impl.Application - Started Application in 2.542 seconds (JVM running for 3.01)
14:37:48.608 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connected to AMQP 1.0 container [amqp://127.0.0.1:5672], opening connection ...
14:37:48.611 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connected to AMQP 1.0 container [amqp://127.0.0.1:5672], opening connection ...
14:37:48.612 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connected to AMQP 1.0 container [amqp://127.0.0.1:5672], opening connection ...
14:37:48.612 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connected to AMQP 1.0 container [amqp://127.0.0.1:5672], opening connection ...
14:37:48.614 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connection to container [rabbit@bob-HP-ZBook-15-G4] at [amqp://127.0.0.1:5672] open
14:37:48.615 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connected to Tenant service
14:37:48.616 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connection to container [rabbit@bob-HP-ZBook-15-G4] at [amqp://127.0.0.1:5672] open
14:37:48.616 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connected to Messaging
14:37:48.616 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connection to container [rabbit@bob-HP-ZBook-15-G4] at [amqp://127.0.0.1:5672] open
14:37:48.616 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connected to Device Registration service
14:37:48.617 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connection to container [rabbit@bob-HP-ZBook-15-G4] at [amqp://127.0.0.1:5672] open
14:37:48.617 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connected to Credentials service
14:37:48.617 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connected to AMQP 1.0 container [amqp://127.0.0.1:5672], opening connection ...
14:37:48.618 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.ConnectionFactoryImpl - connection to container [rabbit@bob-HP-ZBook-15-G4] at [amqp://127.0.0.1:5672] open
14:37:48.618 [vert.x-eventloop-thread-0] INFO o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connected to Command and Control service
向 MQTT 适配器发送消息 (mosquitto_pub -p 11883 -t telemetry/DEFAULT_TENANT/4711 -m '{"temp": 5}' 时(示例取自 https://www.eclipse.org/hono/user-guide/mqtt-adapter/),我正在登录 MQTT 适配器服务:
14:38:05.208 [vert.x-eventloop-thread-0] DEBUG o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - connection request from client [client-id: mosqpub|21426-bob-HP-ZB]
14:38:05.213 [vert.x-eventloop-thread-0] DEBUG o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - unauthenticated device [clientId: mosqpub|21426-bob-HP-ZB] connected
14:38:05.215 [vert.x-eventloop-thread-0] INFO o.e.h.s.m.LoggingConnectionEventProducer - Connected - ID: mosqpub|21426-bob-HP-ZB, Protocol Adapter: hono-mqtt, Device: null, Data: null
14:38:05.222 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - creating new message sender for telemetry/DEFAULT_TENANT
14:38:05.236 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - creating new client [target: registration/DEFAULT_TENANT]
14:38:05.236 [vert.x-eventloop-thread-0] DEBUG o.e.h.c.impl.RegistrationClientImpl - creating new registration client for [DEFAULT_TENANT]
14:38:05.246 [vert.x-eventloop-thread-0] INFO o.e.h.c.i.AbstractRequestResponseClient - enabling caching of responses from registration/DEFAULT_TENANT
14:38:05.253 [vert.x-eventloop-thread-0] DEBUG o.e.hono.client.impl.HonoClientImpl - creating new client [target: tenant]
14:38:05.254 [vert.x-eventloop-thread-0] DEBUG o.e.h.client.impl.TenantClientImpl - creating new tenant client
14:38:05.254 [vert.x-eventloop-thread-0] INFO o.e.h.c.i.AbstractRequestResponseClient - enabling caching of responses from tenant
14:38:05.258 [vert.x-eventloop-thread-0] INFO o.e.hono.client.impl.HonoClientImpl - remote server [127.0.0.1:5672] closed connection with error condition: The connections default session closed unexpectedly: : io.vertx.core.impl.NoStackTraceThrowable: Error{condition=amqp:invalid-field, description='Attach rejected: {unknown_destination,"telemetry/DEFAULT_TENANT"}', info=null}
14:38:05.260 [vert.x-eventloop-thread-0] DEBUG o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - cannot process message [endpoint: telemetry] from device [tenantId: DEFAULT_TENANT, deviceId: 4711]
org.eclipse.hono.client.ServerErrorException: no connection to service
at org.eclipse.hono.client.impl.HonoClientImpl.lambda$getOrCreateSender(HonoClientImpl.java:619)
at org.eclipse.hono.client.impl.HonoClientImpl.failAllCreationRequests(HonoClientImpl.java:463)
at org.eclipse.hono.client.impl.HonoClientImpl.clearState(HonoClientImpl.java:455)
at org.eclipse.hono.client.impl.HonoClientImpl.handleConnectionLoss(HonoClientImpl.java:437)
at org.eclipse.hono.client.impl.HonoClientImpl.onRemoteClose(HonoClientImpl.java:417)
at org.eclipse.hono.client.impl.HonoClientImpl.lambda$connect(HonoClientImpl.java:372)
at io.vertx.proton.impl.ProtonConnectionImpl.lambda$getDefaultSession(ProtonConnectionImpl.java:256)
at io.vertx.proton.impl.ProtonSessionImpl.fireRemoteClose(ProtonSessionImpl.java:270)
at io.vertx.proton.impl.ProtonTransport.handleSocketBuffer(ProtonTransport.java:125)
at io.vertx.core.net.impl.NetSocketImpl$DataMessageHandler.handle(NetSocketImpl.java:384)
at io.vertx.core.net.impl.NetSocketImpl.handleMessageReceived(NetSocketImpl.java:351)
at io.vertx.core.net.impl.NetClientImpl.handleMessage(NetClientImpl.java:242)
at io.vertx.core.net.impl.NetClientImpl.handleMessage(NetClientImpl.java:239)
at io.vertx.core.net.impl.VertxHandler.lambda$channelRead(VertxHandler.java:146)
at io.vertx.core.impl.ContextImpl.lambda$wrapTask(ContextImpl.java:337)
at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:195)
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:144)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:141)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:844)
14:38:05.261 [vert.x-eventloop-thread-0] DEBUG o.e.h.a.m.i.VertxBasedMqttProtocolAdapter - discarding message [topic: telemetry/DEFAULT_TENANT/4711] from device: no connection to service
它试图连接到哪个服务? 一个题外话的小问题:在docker_swarm.sh我也注意到了消息服务,是做什么用的?在 https://www.eclipse.org/hono/getting-started/#starting-hono 上没有提及。
提前致谢!
Hono doesn't (officially) support RabbitMQ as the AMQP 1.0 Messaging Network. However, based on Rabbit's support for AMQP 1.0, you may still get it to work. That said, it looks like the MQTT adapter cannot open a sender link to Rabbit which seems to reject the adapter's request to open a sender on target address telemetry/DEFAULT_TENANT
. This might be due to Rabbit's origins as a message broker which requires to define queues and/or topics before they can be used during runtime.
I am no expert in RabbitMQ but I think you should figure out
- how to configure RabbitMQ so that it dynamically creates queues/topics during runtime when a corresponding link is opened by a peer, or
- define the corresponding queues in RabbitMQ upfront, e.g. by means of adding them to Rabbit's configuration file(s).
If it is not imperative to use RabbitMQ, I would actually recommend to use Qpid Dispatch Router together with Apache MQ Artemis instead. You will get better support from the Hono dev team for it, because that's the official implementation of the AMQP Messaging Network being used for development.
Regarding your side note: Before Hono 0.7, all Protocol Adapters had to connect to the Messaging service, which provided a central point in the message flow where Hono could verify the origin of a telemetry message or event before forwarding it downstream to the AMQP Messaging Network (and hence the consuming applications).
For this purpose, a protocol adapter needed to include in each message a JSON Web Token which it obtained from the Device Registration service for a connected device and which asserted the device's registration status. Hono Messaging would then verify the JWT's signature, thus ensuring that the message origins from an existing (registered) device of a tenant that the protocol adapter is actually enabled for. The idea was that the Device Registry would not issue a JWT for a device that the adapter was not authorized for. This is particularly useful for scenarios where a custom (third party provided) protocol adapter should be integrated with a Hono installation (e.g. for a particular tenant) but that protocol adapter should be prevented from sending messages downstream on behalf or arbitrary devices that do not belong to the tenant that the adapter has been authorized for.
In more recent versions we dropped this component in favor of authorizing the protocol adapters at the AMQP Messaging Network.
Logging
All Hono services log information to standard out. The log level can be set by means of the spring.profiles.active
Java system 属性. If not set, the services will log at INFO level, just providing very limited information about the lifecycle events the component is going through.
In order to turn on more exhaustive debug logging, set the system 属性 like this -Dspring.profiles.active=dev
when starting the JVM.