使用 python 质子连接到 Eclipse Hono AMQP 适配器
Connect to Eclipse Hono AMQP Adaptor using python proton
我目前正在尝试通过 AMPQ 适配器向 Hono 沙盒发送遥测消息。
尽管我接管了 中看到的部分代码示例(它也应该适用于南桥),但我似乎对 SASL 有点挣扎。
这是我的代码
from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
tenantId = 'xxxx'
deviceId = 'yyyyy'
devicePassword = 'my-secret-password'
class AmqpMessageSender(MessagingHandler):
def __init__(self, server, address):
super(AmqpMessageSender, self).__init__()
self.server = server
self.address = address
def on_start(self, event):
conn = event.container.connect(
self.server,
sasl_enabled=True,
allowed_mechs="PLAIN",
allow_insecure_mechs=True,
user=f'{deviceId}@{tenantId}',
password=devicePassword
)
event.container.create_sender(conn, self.address)
def on_sendable(self, event):
msg = Message(
address=f'{self.address}/{deviceId}',
content_type='application/json',
body={"temp": 5, "transport": "amqp"}
)
event.sender.send(self.msg)
event.sender.close()
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_transport_error(self, event):
print("Transport Error")
Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5671', f'telemetry/{tenantId}')).run()
如果我 运行 代码,我会收到上下文条件的传输错误
'Expected SASL protocol header: no protocol header found (connection aborted)'
我还尝试使用端口 5672,这让我遇到了 link 错误,并使用了端口 15672(实际上是北桥端口),令我惊讶的是,这并没有导致 SASL 错误,但让我得到了预期的“未授权”错误(因为设备不允许通过北桥连接)
=======
更新=======
再次感谢您的宝贵时间。
关于 a) 因为一旦将代码作为问题的答案,这里的评论就相当有限。我用来模拟设备的代码如下
from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
tenantId = 'xxx'
deviceId = 'yyy'
devicePassword = 'my-secret-password'
class AmqpMessageSender(MessagingHandler):
def __init__(self, server):
super(AmqpMessageSender, self).__init__()
self.server = server
def on_start(self, event):
print("In start")
conn = event.container.connect(
self.server,
sasl_enabled=True,
allowed_mechs="PLAIN",
allow_insecure_mechs=True,
user=f'{deviceId}@{tenantId}',
password=devicePassword
)
print("connection established")
event.container.create_sender(context=conn, target=None)
print("sender created")
def on_sendable(self, event):
print("In Msg send")
event.sender.send(Message(
address=f'telemetry',
properties={
'to': 'telemetry',
'content-type': 'application/json'
},
content_type='application/json',
body={"temp": 5, "transport": "amqp"}
))
event.sender.close()
event.connection.close()
print("Sender & connection closed")
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_transport_error(self, event):
print("Transport Error")
Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5672')).run()
为了模拟服务器,我不使用 java 客户端,但也使用 python 快速入门示例中的示例代码。我还有一个客户端 class,它像 python 快速启动示例中那样执行 http 调用,服务器 class 做出反应并打印消息 - 因此下面概述的服务器实现应该根据我的理解没问题:
from __future__ import print_function, unicode_literals
import threading
import time
from proton.handlers import MessagingHandler
from proton.reactor import Container
amqpNetworkIp = "hono.eclipseprojects.io"
tenantId = 'xxx'
class AmqpReceiver(MessagingHandler):
def __init__(self, server, address, name):
super(AmqpReceiver, self).__init__()
self.server = server
self.address = address
self._name = name
def on_start(self, event):
conn = event.container.connect(self.server, user="consumer@HONO", password="verysecret")
event.container.create_receiver(conn, self.address)
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_message(self, event):
print(self._name)
print("Got a message:")
print(event.message.body)
class CentralServer:
def listen_telemetry(self, name):
uri = f'amqp://{amqpNetworkIp}:15672'
address = f'telemetry/{tenantId}'
self.container = Container(AmqpReceiver(uri, address, name))
print("Starting (northbound) AMQP Connection...")
self.thread = threading.Thread(target=lambda: self.container.run(), daemon=True)
self.thread.start()
time.sleep(2)
def stop(self):
# Stop container
print("Stopping (northbound) AMQP Connection...")
self.container.stop()
self.thread.join(timeout=5)
CentralServer().listen_telemetry('cs1')
又过了一天,我找不到我做错了什么我真的希望你能看到我错过了什么:)
br 阿敏
AMQP 协议适配器要求设备send messages via an anonymous terminus。
在您的代码中,这意味着 on_start
方法需要更改为包含 event.container.create_sender(context=conn, target=None)
。
无论如何,AMQP适配器的非TLS端口是5672,所以你应该使用amqp://hono.eclipseprojects.io:5672
作为服务器地址。构造函数的第二个参数 (telemetry
) 无关紧要,可以删除。
还要确保您的租户有消费者 运行。否则,发件人将不会因实际发送消息而获得任何积分 ...
2021 年 10 月 21 日编辑
这段代码对我有用...
class AmqpMessageSender(MessagingHandler):
def __init__(self, server):
super(AmqpMessageSender, self).__init__()
self.server = server
def on_start(self, event):
print("In start")
conn = event.container.connect(
url=self.server,
sasl_enabled=True,
allowed_mechs="PLAIN",
allow_insecure_mechs=True,
user=f'{deviceId}@{tenantId}',
password=devicePassword
)
print("connection established")
event.container.create_sender(context=conn, target=None)
print("sender created")
def on_sendable(self, event):
print("In Msg send")
event.sender.send(Message(
address=f'telemetry',
content_type='application/json',
body="{\"temp\": 5, \"transport\": \"amqp\"}"
))
event.sender.close()
event.connection.close()
print("Sender & connection closed")
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_transport_error(self, event):
print("Transport Error")
print(event)
Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5672')).run()
我目前正在尝试通过 AMPQ 适配器向 Hono 沙盒发送遥测消息。
尽管我接管了
这是我的代码
from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
tenantId = 'xxxx'
deviceId = 'yyyyy'
devicePassword = 'my-secret-password'
class AmqpMessageSender(MessagingHandler):
def __init__(self, server, address):
super(AmqpMessageSender, self).__init__()
self.server = server
self.address = address
def on_start(self, event):
conn = event.container.connect(
self.server,
sasl_enabled=True,
allowed_mechs="PLAIN",
allow_insecure_mechs=True,
user=f'{deviceId}@{tenantId}',
password=devicePassword
)
event.container.create_sender(conn, self.address)
def on_sendable(self, event):
msg = Message(
address=f'{self.address}/{deviceId}',
content_type='application/json',
body={"temp": 5, "transport": "amqp"}
)
event.sender.send(self.msg)
event.sender.close()
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_transport_error(self, event):
print("Transport Error")
Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5671', f'telemetry/{tenantId}')).run()
如果我 运行 代码,我会收到上下文条件的传输错误
'Expected SASL protocol header: no protocol header found (connection aborted)'
我还尝试使用端口 5672,这让我遇到了 link 错误,并使用了端口 15672(实际上是北桥端口),令我惊讶的是,这并没有导致 SASL 错误,但让我得到了预期的“未授权”错误(因为设备不允许通过北桥连接)
======= 更新=======
再次感谢您的宝贵时间。
关于 a) 因为一旦将代码作为问题的答案,这里的评论就相当有限。我用来模拟设备的代码如下
from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
tenantId = 'xxx'
deviceId = 'yyy'
devicePassword = 'my-secret-password'
class AmqpMessageSender(MessagingHandler):
def __init__(self, server):
super(AmqpMessageSender, self).__init__()
self.server = server
def on_start(self, event):
print("In start")
conn = event.container.connect(
self.server,
sasl_enabled=True,
allowed_mechs="PLAIN",
allow_insecure_mechs=True,
user=f'{deviceId}@{tenantId}',
password=devicePassword
)
print("connection established")
event.container.create_sender(context=conn, target=None)
print("sender created")
def on_sendable(self, event):
print("In Msg send")
event.sender.send(Message(
address=f'telemetry',
properties={
'to': 'telemetry',
'content-type': 'application/json'
},
content_type='application/json',
body={"temp": 5, "transport": "amqp"}
))
event.sender.close()
event.connection.close()
print("Sender & connection closed")
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_transport_error(self, event):
print("Transport Error")
Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5672')).run()
为了模拟服务器,我不使用 java 客户端,但也使用 python 快速入门示例中的示例代码。我还有一个客户端 class,它像 python 快速启动示例中那样执行 http 调用,服务器 class 做出反应并打印消息 - 因此下面概述的服务器实现应该根据我的理解没问题:
from __future__ import print_function, unicode_literals
import threading
import time
from proton.handlers import MessagingHandler
from proton.reactor import Container
amqpNetworkIp = "hono.eclipseprojects.io"
tenantId = 'xxx'
class AmqpReceiver(MessagingHandler):
def __init__(self, server, address, name):
super(AmqpReceiver, self).__init__()
self.server = server
self.address = address
self._name = name
def on_start(self, event):
conn = event.container.connect(self.server, user="consumer@HONO", password="verysecret")
event.container.create_receiver(conn, self.address)
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_message(self, event):
print(self._name)
print("Got a message:")
print(event.message.body)
class CentralServer:
def listen_telemetry(self, name):
uri = f'amqp://{amqpNetworkIp}:15672'
address = f'telemetry/{tenantId}'
self.container = Container(AmqpReceiver(uri, address, name))
print("Starting (northbound) AMQP Connection...")
self.thread = threading.Thread(target=lambda: self.container.run(), daemon=True)
self.thread.start()
time.sleep(2)
def stop(self):
# Stop container
print("Stopping (northbound) AMQP Connection...")
self.container.stop()
self.thread.join(timeout=5)
CentralServer().listen_telemetry('cs1')
又过了一天,我找不到我做错了什么我真的希望你能看到我错过了什么:)
br 阿敏
AMQP 协议适配器要求设备send messages via an anonymous terminus。
在您的代码中,这意味着 on_start
方法需要更改为包含 event.container.create_sender(context=conn, target=None)
。
无论如何,AMQP适配器的非TLS端口是5672,所以你应该使用amqp://hono.eclipseprojects.io:5672
作为服务器地址。构造函数的第二个参数 (telemetry
) 无关紧要,可以删除。
还要确保您的租户有消费者 运行。否则,发件人将不会因实际发送消息而获得任何积分 ...
2021 年 10 月 21 日编辑
这段代码对我有用...
class AmqpMessageSender(MessagingHandler):
def __init__(self, server):
super(AmqpMessageSender, self).__init__()
self.server = server
def on_start(self, event):
print("In start")
conn = event.container.connect(
url=self.server,
sasl_enabled=True,
allowed_mechs="PLAIN",
allow_insecure_mechs=True,
user=f'{deviceId}@{tenantId}',
password=devicePassword
)
print("connection established")
event.container.create_sender(context=conn, target=None)
print("sender created")
def on_sendable(self, event):
print("In Msg send")
event.sender.send(Message(
address=f'telemetry',
content_type='application/json',
body="{\"temp\": 5, \"transport\": \"amqp\"}"
))
event.sender.close()
event.connection.close()
print("Sender & connection closed")
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_transport_error(self, event):
print("Transport Error")
print(event)
Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5672')).run()