在 AMQP 设备上回复 Hono 命令
Reply to Hono command on AMQP device
我正在尝试创建一个能够从 hono 接收命令并回复它的原型设备。
我已经安装了 hono 1.10.0 和 运行 以下 python 代码
import threading
import time
from proton import Message
from proton.reactor import Container
from amqp import AmqpSender, AmqpReceiver
from hono import tenantId, deviceId, devicePassword, device_uri, biz_app_uri
correlation_id = 'myCorrelationId'
command_reply_to = f'command_response/{tenantId}/{correlation_id}'
print("Business application subscribing for the command reply--------------------------------------------")
cr_container = Container(AmqpReceiver(biz_app_uri, command_reply_to, "consumer@HONO", "verysecret"))
cr_thread = threading.Thread(target=lambda: cr_container.run(), daemon=True)
cr_thread.start()
# Give it some time to link
time.sleep(5)
print("Device subscribing for commands-------------------------------------------------------------------")
c_container = Container(AmqpReceiver(device_uri, f'command', f'{deviceId}@{tenantId}', devicePassword))
c_thread = threading.Thread(target=lambda: c_container.run(), daemon=True)
c_thread.start()
# Give it some time to link
time.sleep(2)
print("Business application sending a command------------------------------------------------------------")
msg = Message(
address=f'command/{tenantId}/{deviceId}',
reply_to=command_reply_to,
correlation_id=correlation_id,
content_type="text/plain",
subject="call",
body="Hello Bob!"
)
#as in example
Container(AmqpSender(biz_app_uri, [msg], "consumer@HONO", "verysecret", address=f'command/{tenantId}')).run()
time.sleep(2)
print("Device sending a command response-----------------------------------------------------------------")
resp = Message(
address=command_reply_to,
correlation_id=correlation_id,
content_type="text/plain",
properties={
'status': 200,
'device_id': deviceId,
'tenant_id': tenantId
},
subject="call",
body="Hello Alice!"
)
Container(AmqpSender(device_uri, [resp], f'{deviceId}@{tenantId}', devicePassword)).run()
time.sleep(2)
print("Device stops listeing for commands----------------------------------------------------------------")
c_container.stop()
c_thread.join(timeout=5)
print("Business application stops listening for command responsets---------------------------------------")
cr_container.stop()
cr_thread.join(timeout=5)
print("everything stopped")
我在 and according to my understanding of https://www.eclipse.org/hono/docs/api/command-and-control/ and https://www.eclipse.org/hono/docs/user-guide/amqp-adapter/#sending-a-response-to-a-command 的帮助下完成了这个实现。
目前我似乎并没有错,因为设备接收到命令,并且发送消息也没有显示任何错误。但是在接收端什么也没有到达。澄清一下,AmqpReceiver 实现适用于我侦听遥测数据的场景。因此,如果实现应该是相同的(除了不同的地址),那么这应该不是问题。
我非常相信我对消息中的 address/reply_to 做错了,但我无法确认,因为 hono pods 中的日志没有告诉我任何事情:(
br
阿敏
======更新===============================
我目前运行的代码如下
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import AtLeastOnce
class Amqp(MessagingHandler):
def __init__(self, server, address, user, password, options=None):
super(Amqp, self).__init__()
self.server = server
self.address = address
self.user = user
self.password = password
self.options = options
self.connection = None
def create_connection(self, event):
self.connection = event.container.connect(
self.server,
sasl_enabled=True,
allowed_mechs="PLAIN",
allow_insecure_mechs=True,
user=self.user,
password=self.password
)
print("Connection established")
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_transport_error(self, event):
print("Transport Error")
def on_link_opened(self, event):
if event.link.is_sender:
print("Opened sender link")
if event.link.is_receiver:
print("Opened receiver link for source address '{0}'".format(event.receiver.source.address))
class AmqpReceiver(Amqp):
def __init__(self, server, address, user, password, options=None):
super(AmqpReceiver, self).__init__(server, address, user, password, options)
self.server = server
self.user = user
self.password = password
def on_start(self, event):
self.create_connection(event)
event.container.create_receiver(context=self.connection, source=self.address, options=self.options)
print("Receiver created")
def on_message(self, event):
print(f'Receiver [{self.address}] got message:')
print(f' {event.message.reply_to}')
print(f' {event.message.correlation_id}')
print(f' {event.message.properties}')
print(f' {event.message.subject}')
print(f' {event.message.body}')
#just for test purposes - the device sends imediatelly the reply if a reply_to is given
if event.message.reply_to is not None:
reply_to = event.message.reply_to.split('/')
tenant_id = reply_to[1]
device_id = reply_to[2]
resp = Message(
address=event.message.reply_to,
correlation_id=event.message.correlation_id,
content_type="text/plain",
properties={
'status': 200,
'tenant_id': tenant_id,
'device_id': device_id
},
body=f'Reply on {event.message.body}'
)
sender = event.container.create_sender(self.connection, None, options=AtLeastOnce())
sender.send(resp)
sender.close()
print("Reply send")
class AmqpSender(Amqp):
def __init__(self, server, messages, user, password, address=None, options=None):
super(AmqpSender, self).__init__(server, address, user, password, options)
self.messages = messages
def on_start(self, event):
self.create_connection(event)
event.container.create_sender(context=self.connection, target=self.address)
print("Sender created")
def on_sendable(self, event):
print("In Msg send")
for msg in self.messages:
event.sender.send(msg)
event.sender.close()
event.connection.close()
print("Sender & connection closed")
在测试脚本中,我按如下方式使用它
from __future__ import print_function, unicode_literals
import threading
import time
from proton import Message
from proton.reactor import Container
from amqp import AmqpSender, AmqpReceiver
biz_app_uri = f'amqp://localhost:15672'
device_uri = f'amqp://localhost:5672'
tenantId = 'ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8'
deviceId = 'b932fb15-fdbd-4c12-9ed7-40aaa8763412'
biz_app_user = 'consumer@HONO'
biz_app_pw = 'verysecret'
device_user = f'{deviceId}@{tenantId}'
device_pw = 'my-secret-password'
correlation_id = 'myCorrelationId'
command_reply_to = f'command_response/{tenantId}/{correlation_id}'
print("Business application subscribing for command replies-------------------------------------------")
cr_container = Container(AmqpReceiver(biz_app_uri, command_reply_to, biz_app_user, biz_app_pw))
cr_thread = threading.Thread(target=lambda: cr_container.run(), daemon=True)
cr_thread.start()
time.sleep(2)
print("Device subscribing for commands-------------------------------------------------------------------")
c_container = Container(AmqpReceiver(device_uri, f'command', device_user, device_pw))
c_thread = threading.Thread(target=lambda: c_container.run(), daemon=True)
c_thread.start()
time.sleep(2)
print("Business application sending a command------------------------------------------------------------")
msg = Message(
address=f'command/{tenantId}/{deviceId}',
reply_to=command_reply_to,
correlation_id=correlation_id,
content_type="text/plain",
subject="call",
body="Hello Bob!"
)
#as in example
Container(AmqpSender(biz_app_uri, [msg], biz_app_user, biz_app_pw, address=f'command/{tenantId}')).run()
time.sleep(10)
print("Device stops listeing for commands----------------------------------------------------------------")
c_container.stop()
c_thread.join(timeout=5)
#print("Business application stops listening ---------------------------------------")
#cr_container.stop()
#cr_thread.join(timeout=5)
#print("everything stopped")
如果我 运行 该代码示例我得到以下日志(见下文)并且代码被卡住,因为命令回复接收器保持打开状态。
登录 hono 调度路由器:
2021-11-14 19:08:29.420176 +0000 SERVER (info) enabling remote authentication service hono-1635540280-service-auth:5671
2021-11-14 19:08:29.429734 +0000 SERVER (info) [C115] Accepted connection to 0.0.0.0:5672 from 10.42.0.70:36742
2021-11-14 19:08:29.447479 +0000 AUTHSERVICE (info) authenticated as consumer@HONO
2021-11-14 19:08:29.448213 +0000 ROUTER (info) [C115] Connection Opened: dir=in host=10.42.0.70:36742 vhost= encrypted=no auth=PLAIN user=consumer@HONO container_id=a782f51c-9679-41fb-a682-8ea603ccf1ac props=
2021-11-14 19:08:29.448316 +0000 ROUTER_CORE (info) [C115][L123] Link attached: dir=out source={command_response/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8/myCorrelationId expire:sess} target={<none> expire:sess}
2021-11-14 19:08:33.423325 +0000 SERVER (info) enabling remote authentication service hono-1635540280-service-auth:5671
2021-11-14 19:08:33.430810 +0000 SERVER (info) [C116] Accepted connection to 0.0.0.0:5672 from 10.42.0.70:36868
2021-11-14 19:08:33.445574 +0000 AUTHSERVICE (info) authenticated as consumer@HONO
2021-11-14 19:08:33.446328 +0000 ROUTER (info) [C116] Connection Opened: dir=in host=10.42.0.70:36868 vhost= encrypted=no auth=PLAIN user=consumer@HONO container_id=92cb7173-2940-4330-a995-f26eccef0905 props=
2021-11-14 19:08:33.446388 +0000 ROUTER_CORE (info) [C116][L124] Link attached: dir=in source={<none> expire:sess} target={command/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8 expire:sess}
2021-11-14 19:08:33.447762 +0000 ROUTER_CORE (info) [C116][L124] Link detached: del=1 presett=0 psdrop=0 acc=0 rej=0 rel=0 mod=0 delay1=0 delay10=0 blocked=no
登录 amqp 适配器
2021-11-14 19:08:31,511 INFO [org.ecl.hon.ada.mon.LoggingConnectionEventProducer] (vert.x-eventloop-thread-1) Connected - ID: 100b1859-e8a0-4bff-ad91-a48dce4babb5, Protocol Adapter: hono-amqp, Device: device [device-id: b932fb15-fdbd-4c12-9ed7-40aaa8763412, tenant-id: ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8], Data: null
2021-11-14 19:19:29,875 INFO [org.ecl.hon.ada.mon.LoggingConnectionEventProducer] (vert.x-eventloop-thread-1) Disconnected - ID: 100b1859-e8a0-4bff-ad91-a48dce4babb5, Protocol Adapter: hono-amqp, Device: device [device-id: b932fb15-fdbd-4c12-9ed7-40aaa8763412, tenant-id: ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8], Data: null
您的设备发送的命令响应似乎包含错误的地址。正如在 AMQP Adapter User Guide 中指出的,响应的 address
属性 必须设置为命令的 reply-to
属性 的值。该值通常与您的应用程序在命令消息中设置的 reply-to
值不同,因为协议适配器需要将一些附加信息编码到对地址的回复中,以便能够确定正确的设备 ID向下游转发命令响应。
因此,在您的代码中,您需要在设备端检查命令消息并将其 reply-to
值用作命令响应的 address
值。
除此之外,AMQP 适配器期望命令响应中的状态 属性 为 AMQP 1.0 类型 int
(32 位带符号整数)。但是,对于您的代码,默认情况下 属性 值被编码为 AMQP 1.0 long
(64 位有符号整数)。为了正确编码,您需要从 proton._data
导入 int32
class,然后将 属性 值设置为 int32(200)
。然后适配器接受命令响应并将其转发到下游。
我正在尝试创建一个能够从 hono 接收命令并回复它的原型设备。
我已经安装了 hono 1.10.0 和 运行 以下 python 代码
import threading
import time
from proton import Message
from proton.reactor import Container
from amqp import AmqpSender, AmqpReceiver
from hono import tenantId, deviceId, devicePassword, device_uri, biz_app_uri
correlation_id = 'myCorrelationId'
command_reply_to = f'command_response/{tenantId}/{correlation_id}'
print("Business application subscribing for the command reply--------------------------------------------")
cr_container = Container(AmqpReceiver(biz_app_uri, command_reply_to, "consumer@HONO", "verysecret"))
cr_thread = threading.Thread(target=lambda: cr_container.run(), daemon=True)
cr_thread.start()
# Give it some time to link
time.sleep(5)
print("Device subscribing for commands-------------------------------------------------------------------")
c_container = Container(AmqpReceiver(device_uri, f'command', f'{deviceId}@{tenantId}', devicePassword))
c_thread = threading.Thread(target=lambda: c_container.run(), daemon=True)
c_thread.start()
# Give it some time to link
time.sleep(2)
print("Business application sending a command------------------------------------------------------------")
msg = Message(
address=f'command/{tenantId}/{deviceId}',
reply_to=command_reply_to,
correlation_id=correlation_id,
content_type="text/plain",
subject="call",
body="Hello Bob!"
)
#as in example
Container(AmqpSender(biz_app_uri, [msg], "consumer@HONO", "verysecret", address=f'command/{tenantId}')).run()
time.sleep(2)
print("Device sending a command response-----------------------------------------------------------------")
resp = Message(
address=command_reply_to,
correlation_id=correlation_id,
content_type="text/plain",
properties={
'status': 200,
'device_id': deviceId,
'tenant_id': tenantId
},
subject="call",
body="Hello Alice!"
)
Container(AmqpSender(device_uri, [resp], f'{deviceId}@{tenantId}', devicePassword)).run()
time.sleep(2)
print("Device stops listeing for commands----------------------------------------------------------------")
c_container.stop()
c_thread.join(timeout=5)
print("Business application stops listening for command responsets---------------------------------------")
cr_container.stop()
cr_thread.join(timeout=5)
print("everything stopped")
我在
目前我似乎并没有错,因为设备接收到命令,并且发送消息也没有显示任何错误。但是在接收端什么也没有到达。澄清一下,AmqpReceiver 实现适用于我侦听遥测数据的场景。因此,如果实现应该是相同的(除了不同的地址),那么这应该不是问题。
我非常相信我对消息中的 address/reply_to 做错了,但我无法确认,因为 hono pods 中的日志没有告诉我任何事情:(
br 阿敏
======更新===============================
我目前运行的代码如下
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import AtLeastOnce
class Amqp(MessagingHandler):
def __init__(self, server, address, user, password, options=None):
super(Amqp, self).__init__()
self.server = server
self.address = address
self.user = user
self.password = password
self.options = options
self.connection = None
def create_connection(self, event):
self.connection = event.container.connect(
self.server,
sasl_enabled=True,
allowed_mechs="PLAIN",
allow_insecure_mechs=True,
user=self.user,
password=self.password
)
print("Connection established")
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_transport_error(self, event):
print("Transport Error")
def on_link_opened(self, event):
if event.link.is_sender:
print("Opened sender link")
if event.link.is_receiver:
print("Opened receiver link for source address '{0}'".format(event.receiver.source.address))
class AmqpReceiver(Amqp):
def __init__(self, server, address, user, password, options=None):
super(AmqpReceiver, self).__init__(server, address, user, password, options)
self.server = server
self.user = user
self.password = password
def on_start(self, event):
self.create_connection(event)
event.container.create_receiver(context=self.connection, source=self.address, options=self.options)
print("Receiver created")
def on_message(self, event):
print(f'Receiver [{self.address}] got message:')
print(f' {event.message.reply_to}')
print(f' {event.message.correlation_id}')
print(f' {event.message.properties}')
print(f' {event.message.subject}')
print(f' {event.message.body}')
#just for test purposes - the device sends imediatelly the reply if a reply_to is given
if event.message.reply_to is not None:
reply_to = event.message.reply_to.split('/')
tenant_id = reply_to[1]
device_id = reply_to[2]
resp = Message(
address=event.message.reply_to,
correlation_id=event.message.correlation_id,
content_type="text/plain",
properties={
'status': 200,
'tenant_id': tenant_id,
'device_id': device_id
},
body=f'Reply on {event.message.body}'
)
sender = event.container.create_sender(self.connection, None, options=AtLeastOnce())
sender.send(resp)
sender.close()
print("Reply send")
class AmqpSender(Amqp):
def __init__(self, server, messages, user, password, address=None, options=None):
super(AmqpSender, self).__init__(server, address, user, password, options)
self.messages = messages
def on_start(self, event):
self.create_connection(event)
event.container.create_sender(context=self.connection, target=self.address)
print("Sender created")
def on_sendable(self, event):
print("In Msg send")
for msg in self.messages:
event.sender.send(msg)
event.sender.close()
event.connection.close()
print("Sender & connection closed")
在测试脚本中,我按如下方式使用它
from __future__ import print_function, unicode_literals
import threading
import time
from proton import Message
from proton.reactor import Container
from amqp import AmqpSender, AmqpReceiver
biz_app_uri = f'amqp://localhost:15672'
device_uri = f'amqp://localhost:5672'
tenantId = 'ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8'
deviceId = 'b932fb15-fdbd-4c12-9ed7-40aaa8763412'
biz_app_user = 'consumer@HONO'
biz_app_pw = 'verysecret'
device_user = f'{deviceId}@{tenantId}'
device_pw = 'my-secret-password'
correlation_id = 'myCorrelationId'
command_reply_to = f'command_response/{tenantId}/{correlation_id}'
print("Business application subscribing for command replies-------------------------------------------")
cr_container = Container(AmqpReceiver(biz_app_uri, command_reply_to, biz_app_user, biz_app_pw))
cr_thread = threading.Thread(target=lambda: cr_container.run(), daemon=True)
cr_thread.start()
time.sleep(2)
print("Device subscribing for commands-------------------------------------------------------------------")
c_container = Container(AmqpReceiver(device_uri, f'command', device_user, device_pw))
c_thread = threading.Thread(target=lambda: c_container.run(), daemon=True)
c_thread.start()
time.sleep(2)
print("Business application sending a command------------------------------------------------------------")
msg = Message(
address=f'command/{tenantId}/{deviceId}',
reply_to=command_reply_to,
correlation_id=correlation_id,
content_type="text/plain",
subject="call",
body="Hello Bob!"
)
#as in example
Container(AmqpSender(biz_app_uri, [msg], biz_app_user, biz_app_pw, address=f'command/{tenantId}')).run()
time.sleep(10)
print("Device stops listeing for commands----------------------------------------------------------------")
c_container.stop()
c_thread.join(timeout=5)
#print("Business application stops listening ---------------------------------------")
#cr_container.stop()
#cr_thread.join(timeout=5)
#print("everything stopped")
如果我 运行 该代码示例我得到以下日志(见下文)并且代码被卡住,因为命令回复接收器保持打开状态。
登录 hono 调度路由器:
2021-11-14 19:08:29.420176 +0000 SERVER (info) enabling remote authentication service hono-1635540280-service-auth:5671
2021-11-14 19:08:29.429734 +0000 SERVER (info) [C115] Accepted connection to 0.0.0.0:5672 from 10.42.0.70:36742
2021-11-14 19:08:29.447479 +0000 AUTHSERVICE (info) authenticated as consumer@HONO
2021-11-14 19:08:29.448213 +0000 ROUTER (info) [C115] Connection Opened: dir=in host=10.42.0.70:36742 vhost= encrypted=no auth=PLAIN user=consumer@HONO container_id=a782f51c-9679-41fb-a682-8ea603ccf1ac props=
2021-11-14 19:08:29.448316 +0000 ROUTER_CORE (info) [C115][L123] Link attached: dir=out source={command_response/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8/myCorrelationId expire:sess} target={<none> expire:sess}
2021-11-14 19:08:33.423325 +0000 SERVER (info) enabling remote authentication service hono-1635540280-service-auth:5671
2021-11-14 19:08:33.430810 +0000 SERVER (info) [C116] Accepted connection to 0.0.0.0:5672 from 10.42.0.70:36868
2021-11-14 19:08:33.445574 +0000 AUTHSERVICE (info) authenticated as consumer@HONO
2021-11-14 19:08:33.446328 +0000 ROUTER (info) [C116] Connection Opened: dir=in host=10.42.0.70:36868 vhost= encrypted=no auth=PLAIN user=consumer@HONO container_id=92cb7173-2940-4330-a995-f26eccef0905 props=
2021-11-14 19:08:33.446388 +0000 ROUTER_CORE (info) [C116][L124] Link attached: dir=in source={<none> expire:sess} target={command/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8 expire:sess}
2021-11-14 19:08:33.447762 +0000 ROUTER_CORE (info) [C116][L124] Link detached: del=1 presett=0 psdrop=0 acc=0 rej=0 rel=0 mod=0 delay1=0 delay10=0 blocked=no
登录 amqp 适配器
2021-11-14 19:08:31,511 INFO [org.ecl.hon.ada.mon.LoggingConnectionEventProducer] (vert.x-eventloop-thread-1) Connected - ID: 100b1859-e8a0-4bff-ad91-a48dce4babb5, Protocol Adapter: hono-amqp, Device: device [device-id: b932fb15-fdbd-4c12-9ed7-40aaa8763412, tenant-id: ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8], Data: null
2021-11-14 19:19:29,875 INFO [org.ecl.hon.ada.mon.LoggingConnectionEventProducer] (vert.x-eventloop-thread-1) Disconnected - ID: 100b1859-e8a0-4bff-ad91-a48dce4babb5, Protocol Adapter: hono-amqp, Device: device [device-id: b932fb15-fdbd-4c12-9ed7-40aaa8763412, tenant-id: ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8], Data: null
您的设备发送的命令响应似乎包含错误的地址。正如在 AMQP Adapter User Guide 中指出的,响应的 address
属性 必须设置为命令的 reply-to
属性 的值。该值通常与您的应用程序在命令消息中设置的 reply-to
值不同,因为协议适配器需要将一些附加信息编码到对地址的回复中,以便能够确定正确的设备 ID向下游转发命令响应。
因此,在您的代码中,您需要在设备端检查命令消息并将其 reply-to
值用作命令响应的 address
值。
除此之外,AMQP 适配器期望命令响应中的状态 属性 为 AMQP 1.0 类型 int
(32 位带符号整数)。但是,对于您的代码,默认情况下 属性 值被编码为 AMQP 1.0 long
(64 位有符号整数)。为了正确编码,您需要从 proton._data
导入 int32
class,然后将 属性 值设置为 int32(200)
。然后适配器接受命令响应并将其转发到下游。