使用 python 质子连接到 Eclipse Hono AMQP 适配器

Connect to Eclipse Hono AMQP Adaptor using python proton

我目前正在尝试通过 AMPQ 适配器向 Hono 沙盒发送遥测消息。 尽管我接管了 中看到的部分代码示例(它也应该适用于南桥),但我似乎对 SASL 有点挣扎。

这是我的代码

from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

tenantId = 'xxxx'
deviceId = 'yyyyy'
devicePassword = 'my-secret-password'


class AmqpMessageSender(MessagingHandler):
    def __init__(self, server, address):
        super(AmqpMessageSender, self).__init__()
        self.server = server
        self.address = address

    def on_start(self, event):
        conn = event.container.connect(
            self.server,
            sasl_enabled=True,
            allowed_mechs="PLAIN",
            allow_insecure_mechs=True,
            user=f'{deviceId}@{tenantId}',
            password=devicePassword
        )    
        event.container.create_sender(conn, self.address)

    def on_sendable(self, event):   
        msg = Message(
            address=f'{self.address}/{deviceId}',
            content_type='application/json',
            body={"temp": 5, "transport": "amqp"}
        )
        event.sender.send(self.msg)
        event.sender.close()

    def on_connection_error(self, event):
        print("Connection Error")

    def on_link_error(self, event):
        print("Link Error")

    def on_transport_error(self, event):
        print("Transport Error")


Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5671', f'telemetry/{tenantId}')).run()

如果我 运行 代码,我会收到上下文条件的传输错误

'Expected SASL protocol header: no protocol header found (connection aborted)'

我还尝试使用端口 5672,这让我遇到了 link 错误,并使用了端口 15672(实际上是北桥端口),令我惊讶的是,这并没有导致 SASL 错误,但让我得到了预期的“未授权”错误(因为设备不允许通过北桥连接)

======= 更新=======

再次感谢您的宝贵时间。

关于 a) 因为一旦将代码作为问题的答案,这里的评论就相当有限。我用来模拟设备的代码如下

from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

tenantId = 'xxx'
deviceId = 'yyy'
devicePassword = 'my-secret-password'


class AmqpMessageSender(MessagingHandler):
    def __init__(self, server):
        super(AmqpMessageSender, self).__init__()
        self.server = server

    def on_start(self, event):
        print("In start")
        conn = event.container.connect(
            self.server,
            sasl_enabled=True,
            allowed_mechs="PLAIN",
            allow_insecure_mechs=True,
            user=f'{deviceId}@{tenantId}',
            password=devicePassword
        )
        print("connection established")
        event.container.create_sender(context=conn, target=None)
        print("sender created")

    def on_sendable(self, event):
        print("In Msg send")
        event.sender.send(Message(
            address=f'telemetry',
            properties={
                'to': 'telemetry',
                'content-type': 'application/json'
            },
            content_type='application/json',
            body={"temp": 5, "transport": "amqp"}
        )) 
        event.sender.close()
        event.connection.close()
        print("Sender & connection closed")

    def on_connection_error(self, event):
        print("Connection Error")

    def on_link_error(self, event):
        print("Link Error")

    def on_transport_error(self, event):
        print("Transport Error")

Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5672')).run()

为了模拟服务器,我不使用 java 客户端,但也使用 python 快速入门示例中的示例代码。我还有一个客户端 class,它像 python 快速启动示例中那样执行 http 调用,服务器 class 做出反应并打印消息 - 因此下面概述的服务器实现应该根据我的理解没问题:

from __future__ import print_function, unicode_literals
import threading
import time
from proton.handlers import MessagingHandler
from proton.reactor import Container

amqpNetworkIp = "hono.eclipseprojects.io"
tenantId = 'xxx'


class AmqpReceiver(MessagingHandler):
    def __init__(self, server, address, name):
        super(AmqpReceiver, self).__init__()
        self.server = server
        self.address = address
        self._name = name

    def on_start(self, event):
        conn = event.container.connect(self.server, user="consumer@HONO", password="verysecret")
        event.container.create_receiver(conn, self.address)

    def on_connection_error(self, event):
        print("Connection Error")

    def on_link_error(self, event):
        print("Link Error")

    def on_message(self, event):
        print(self._name)
        print("Got a message:")
        print(event.message.body)


class CentralServer:
    def listen_telemetry(self, name):
        uri = f'amqp://{amqpNetworkIp}:15672'
        address = f'telemetry/{tenantId}'
        self.container = Container(AmqpReceiver(uri, address, name))

        print("Starting (northbound) AMQP Connection...")
        self.thread = threading.Thread(target=lambda: self.container.run(), daemon=True)
        self.thread.start()
        time.sleep(2)

    def stop(self):
        # Stop container
        print("Stopping (northbound) AMQP Connection...")
        self.container.stop()
        self.thread.join(timeout=5)


CentralServer().listen_telemetry('cs1')

又过了一天,我找不到我做错了什么我真的希望你能看到我错过了什么:)

br 阿敏

AMQP 协议适配器要求设备send messages via an anonymous terminus

在您的代码中,这意味着 on_start 方法需要更改为包含 event.container.create_sender(context=conn, target=None)

无论如何,AMQP适配器的非TLS端口是5672,所以你应该使用amqp://hono.eclipseprojects.io:5672作为服务器地址。构造函数的第二个参数 (telemetry) 无关紧要,可以删除。

还要确保您的租户有消费者 运行。否则,发件人将不会因实际发送消息而获得任何积分 ...

2021 年 10 月 21 日编辑

这段代码对我有用...

class AmqpMessageSender(MessagingHandler):
    def __init__(self, server):
        super(AmqpMessageSender, self).__init__()
        self.server = server

    def on_start(self, event):
        print("In start")
        conn = event.container.connect(
            url=self.server,
            sasl_enabled=True,
            allowed_mechs="PLAIN",
            allow_insecure_mechs=True,
            user=f'{deviceId}@{tenantId}',
            password=devicePassword
        )
        print("connection established")
        event.container.create_sender(context=conn, target=None)
        print("sender created")

    def on_sendable(self, event):
        print("In Msg send")
        event.sender.send(Message(
            address=f'telemetry',
            content_type='application/json',
            body="{\"temp\": 5, \"transport\": \"amqp\"}"
        )) 
        event.sender.close()
        event.connection.close()
        print("Sender & connection closed")

    def on_connection_error(self, event):
        print("Connection Error")

    def on_link_error(self, event):
        print("Link Error")

    def on_transport_error(self, event):
        print("Transport Error")
        print(event)

Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5672')).run()