在 AMQP 设备上回复 Hono 命令

Reply to Hono command on AMQP device

我正在尝试创建一个能够从 hono 接收命令并回复它的原型设备。

我已经安装了 hono 1.10.0 和 运行 以下 python 代码

import threading
import time
from proton import Message
from proton.reactor import Container
from amqp import AmqpSender, AmqpReceiver
from hono import tenantId, deviceId, devicePassword, device_uri, biz_app_uri


correlation_id = 'myCorrelationId'
command_reply_to = f'command_response/{tenantId}/{correlation_id}'

print("Business application subscribing for the command reply--------------------------------------------")
cr_container = Container(AmqpReceiver(biz_app_uri, command_reply_to, "consumer@HONO", "verysecret"))
cr_thread = threading.Thread(target=lambda: cr_container.run(), daemon=True)
cr_thread.start()
# Give it some time to link
time.sleep(5)


print("Device subscribing for commands-------------------------------------------------------------------")
c_container = Container(AmqpReceiver(device_uri, f'command', f'{deviceId}@{tenantId}', devicePassword))
c_thread = threading.Thread(target=lambda: c_container.run(), daemon=True)
c_thread.start()
# Give it some time to link
time.sleep(2)


print("Business application sending a command------------------------------------------------------------")
msg = Message(
    address=f'command/{tenantId}/{deviceId}',
    reply_to=command_reply_to,
    correlation_id=correlation_id,
    content_type="text/plain",
    subject="call",
    body="Hello Bob!"
)
#as in example 
Container(AmqpSender(biz_app_uri, [msg], "consumer@HONO", "verysecret", address=f'command/{tenantId}')).run()
time.sleep(2)


print("Device sending a command response-----------------------------------------------------------------")
resp = Message(
    address=command_reply_to,
    correlation_id=correlation_id,
    content_type="text/plain",
    properties={
        'status': 200,
        'device_id': deviceId,
        'tenant_id': tenantId
    },
    subject="call",
    body="Hello Alice!"
)
Container(AmqpSender(device_uri, [resp], f'{deviceId}@{tenantId}', devicePassword)).run()
time.sleep(2)


print("Device stops listeing for commands----------------------------------------------------------------")
c_container.stop()
c_thread.join(timeout=5)
print("Business application stops listening for command responsets---------------------------------------")
cr_container.stop()
cr_thread.join(timeout=5)
print("everything stopped")

我在 and according to my understanding of https://www.eclipse.org/hono/docs/api/command-and-control/ and https://www.eclipse.org/hono/docs/user-guide/amqp-adapter/#sending-a-response-to-a-command 的帮助下完成了这个实现。

目前我似乎并没有错,因为设备接收到命令,并且发送消息也没有显示任何错误。但是在接收端什么也没有到达。澄清一下,AmqpReceiver 实现适用于我侦听遥测数据的场景。因此,如果实现应该是相同的(除了不同的地址),那么这应该不是问题。

我非常相信我对消息中的 address/reply_to 做错了,但我无法确认,因为 hono pods 中的日志没有告诉我任何事情:(

br 阿敏

======更新===============================

我目前运行的代码如下

from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import AtLeastOnce


class Amqp(MessagingHandler):
    def __init__(self, server, address, user, password, options=None):
        super(Amqp, self).__init__()
        self.server = server
        self.address = address
        self.user = user
        self.password = password
        self.options = options
        self.connection = None

    def create_connection(self, event):
        self.connection = event.container.connect(
            self.server,
            sasl_enabled=True,
            allowed_mechs="PLAIN",
            allow_insecure_mechs=True,
            user=self.user,
            password=self.password
        )
        print("Connection established")

    def on_connection_error(self, event):
        print("Connection Error")

    def on_link_error(self, event):
        print("Link Error")

    def on_transport_error(self, event):
        print("Transport Error")

    def on_link_opened(self, event):
        if event.link.is_sender:
            print("Opened sender link")
        if event.link.is_receiver:
            print("Opened receiver link for source address '{0}'".format(event.receiver.source.address))


class AmqpReceiver(Amqp):
    def __init__(self, server, address, user, password, options=None):
        super(AmqpReceiver, self).__init__(server, address, user, password, options)
        self.server = server
        self.user = user
        self.password = password

    def on_start(self, event):
        self.create_connection(event)
        event.container.create_receiver(context=self.connection, source=self.address, options=self.options)
        print("Receiver created")

    def on_message(self, event):
        print(f'Receiver [{self.address}] got message:')
        print(f'  {event.message.reply_to}')
        print(f'  {event.message.correlation_id}')
        print(f'  {event.message.properties}')
        print(f'  {event.message.subject}')
        print(f'  {event.message.body}')
        #just for test purposes - the device sends imediatelly the reply if a reply_to is given
        if event.message.reply_to is not None:
            reply_to = event.message.reply_to.split('/')
            tenant_id = reply_to[1]
            device_id = reply_to[2]
            resp = Message(
                address=event.message.reply_to,
                correlation_id=event.message.correlation_id,
                content_type="text/plain",
                properties={
                    'status': 200,
                    'tenant_id': tenant_id,
                    'device_id': device_id
                },
                body=f'Reply on {event.message.body}'
            )
            sender = event.container.create_sender(self.connection, None, options=AtLeastOnce())
            sender.send(resp)
            sender.close()
            print("Reply send")


class AmqpSender(Amqp):
    def __init__(self, server, messages, user, password, address=None, options=None):
        super(AmqpSender, self).__init__(server, address, user, password, options)
        self.messages = messages

    def on_start(self, event):
        self.create_connection(event)
        event.container.create_sender(context=self.connection, target=self.address)
        print("Sender created")

    def on_sendable(self, event):
        print("In Msg send")
        for msg in self.messages:
            event.sender.send(msg)
        event.sender.close()
        event.connection.close()
        print("Sender & connection closed")

在测试脚本中,我按如下方式使用它

from __future__ import print_function, unicode_literals
import threading
import time
from proton import Message
from proton.reactor import Container
from amqp import AmqpSender, AmqpReceiver


biz_app_uri = f'amqp://localhost:15672'
device_uri = f'amqp://localhost:5672'
tenantId = 'ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8'
deviceId = 'b932fb15-fdbd-4c12-9ed7-40aaa8763412'

biz_app_user = 'consumer@HONO'
biz_app_pw = 'verysecret'
device_user = f'{deviceId}@{tenantId}'
device_pw = 'my-secret-password'

correlation_id = 'myCorrelationId'
command_reply_to = f'command_response/{tenantId}/{correlation_id}'


print("Business application subscribing for command replies-------------------------------------------")
cr_container = Container(AmqpReceiver(biz_app_uri, command_reply_to, biz_app_user, biz_app_pw))
cr_thread = threading.Thread(target=lambda: cr_container.run(), daemon=True)
cr_thread.start()
time.sleep(2)

print("Device subscribing for commands-------------------------------------------------------------------")
c_container = Container(AmqpReceiver(device_uri, f'command', device_user, device_pw))
c_thread = threading.Thread(target=lambda: c_container.run(), daemon=True)
c_thread.start()
time.sleep(2)

print("Business application sending a command------------------------------------------------------------")
msg = Message(
    address=f'command/{tenantId}/{deviceId}',
    reply_to=command_reply_to,
    correlation_id=correlation_id,
    content_type="text/plain",
    subject="call",
    body="Hello Bob!"
)
#as in example 
Container(AmqpSender(biz_app_uri, [msg], biz_app_user, biz_app_pw, address=f'command/{tenantId}')).run()

time.sleep(10)
print("Device stops listeing for commands----------------------------------------------------------------")
c_container.stop()
c_thread.join(timeout=5)
#print("Business application stops listening ---------------------------------------")
#cr_container.stop()
#cr_thread.join(timeout=5)
#print("everything stopped")

如果我 运行 该代码示例我得到以下日志(见下文)并且代码被卡住,因为命令回复接收器保持打开状态。

登录 hono 调度路由器:

2021-11-14 19:08:29.420176 +0000 SERVER (info) enabling remote authentication service hono-1635540280-service-auth:5671
2021-11-14 19:08:29.429734 +0000 SERVER (info) [C115] Accepted connection to 0.0.0.0:5672 from 10.42.0.70:36742
2021-11-14 19:08:29.447479 +0000 AUTHSERVICE (info) authenticated as consumer@HONO
2021-11-14 19:08:29.448213 +0000 ROUTER (info) [C115] Connection Opened: dir=in host=10.42.0.70:36742 vhost= encrypted=no auth=PLAIN user=consumer@HONO container_id=a782f51c-9679-41fb-a682-8ea603ccf1ac props=
2021-11-14 19:08:29.448316 +0000 ROUTER_CORE (info) [C115][L123] Link attached: dir=out source={command_response/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8/myCorrelationId expire:sess} target={<none> expire:sess}
2021-11-14 19:08:33.423325 +0000 SERVER (info) enabling remote authentication service hono-1635540280-service-auth:5671
2021-11-14 19:08:33.430810 +0000 SERVER (info) [C116] Accepted connection to 0.0.0.0:5672 from 10.42.0.70:36868
2021-11-14 19:08:33.445574 +0000 AUTHSERVICE (info) authenticated as consumer@HONO
2021-11-14 19:08:33.446328 +0000 ROUTER (info) [C116] Connection Opened: dir=in host=10.42.0.70:36868 vhost= encrypted=no auth=PLAIN user=consumer@HONO container_id=92cb7173-2940-4330-a995-f26eccef0905 props=
2021-11-14 19:08:33.446388 +0000 ROUTER_CORE (info) [C116][L124] Link attached: dir=in source={<none> expire:sess} target={command/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8 expire:sess}
2021-11-14 19:08:33.447762 +0000 ROUTER_CORE (info) [C116][L124] Link detached: del=1 presett=0 psdrop=0 acc=0 rej=0 rel=0 mod=0 delay1=0 delay10=0 blocked=no

登录 amqp 适配器

2021-11-14 19:08:31,511 INFO [org.ecl.hon.ada.mon.LoggingConnectionEventProducer] (vert.x-eventloop-thread-1) Connected - ID: 100b1859-e8a0-4bff-ad91-a48dce4babb5, Protocol Adapter: hono-amqp, Device: device [device-id: b932fb15-fdbd-4c12-9ed7-40aaa8763412, tenant-id: ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8], Data: null
2021-11-14 19:19:29,875 INFO [org.ecl.hon.ada.mon.LoggingConnectionEventProducer] (vert.x-eventloop-thread-1) Disconnected - ID: 100b1859-e8a0-4bff-ad91-a48dce4babb5, Protocol Adapter: hono-amqp, Device: device [device-id: b932fb15-fdbd-4c12-9ed7-40aaa8763412, tenant-id: ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8], Data: null

您的设备发送的命令响应似乎包含错误的地址。正如在 AMQP Adapter User Guide 中指出的,响应的 address 属性 必须设置为命令的 reply-to 属性 的值。该值通常与您的应用程序在命令消息中设置的 reply-to 值不同,因为协议适配器需要将一些附加信息编码到对地址的回复中,以便能够确定正确的设备 ID向下游转发命令响应。

因此,在您的代码中,您需要在设备端检查命令消息并将其 reply-to 值用作命令响应的 address 值。

除此之外,AMQP 适配器期望命令响应中的状态 属性 为 AMQP 1.0 类型 int(32 位带符号整数)。但是,对于您的代码,默认情况下 属性 值被编码为 AMQP 1.0 long (64 位有符号整数)。为了正确编码,您需要从 proton._data 导入 int32 class,然后将 属性 值设置为 int32(200)。然后适配器接受命令响应并将其转发到下游。