Azure IoT 中心 - "Subscribe" 代码不工作:"connection was refused"

Azure IoT Hub - "Subscribe" code not working: "connection was refused"

当某些数据发布到 MQTT 主题 devices/device_id/messages/events/ 时,我正在尝试从 IoT 中心接收 "notifications"。为了接收数据,我使用了以下主题:devices/device_id/messages/devicebound/。我的订阅代码如下 (Python):

import paho.mqtt.client as mqtt
import ssl, random
from time import sleep

root_ca = "./certs/digicertbaltimoreroot.pem"
public_crt = './certs/rsa_cert.pem'
private_key = './certs/rsa_private.pem'

iothub_name = "myhub"
device_id = "mydevice"
mqtt_url = "{}.azure-devices.net".format(iothub_name)
mqtt_port = 8883
topic = "devices/{}/messages/devicebound/#".format(device_id)

def error_str(rc):
    return "Some error occurred. {}: {}".format(rc, mqtt.error_string(rc))

def on_disconnect(unused_client, unused_userdata, rc):
    print("on_disconnect", error_str(rc))

def on_connect(client, userdata, flags, response_code):
    print("Connected with status: {0}".format(response_code))
    client.subscribe(topic, 1)

def on_subscribe(client, userdata, mid, granted_qos):
    print("Subscribed: "+str(mid)+" "+str(granted_qos))

def on_message(client, userdata, msg):
    print("Topic: {0} -- Payload: {1}".format(msg.topic, str(msg.payload)))

if __name__ == "__main__":    
    client = mqtt.Client(device_id, protocol=mqtt.MQTTv311)

    client.tls_set(root_ca,
                   certfile = public_crt,
                   keyfile = private_key,
                   cert_reqs = ssl.CERT_REQUIRED,
                   tls_version = ssl.PROTOCOL_TLSv1_2,
                   ciphers = None)

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect

    print("Connecting to Azure IoT Hub...")
    client.connect(mqtt_url, mqtt_port, keepalive=60)
    client.loop_forever()

当我 运行 时,我得到以下输出:

Connecting to Azure IoT Hub...
Connected with status: 3
('on_disconnect', 'Some error occurred. 5: The connection was refused.')
Connected with status: 3
('on_disconnect', 'Some error occurred. 5: The connection was refused.')

任何人都可以提出我所缺少的东西吗?

根据文档 "Using the MQTT protocol directly",您需要

to download and reference the DigiCert Baltimore Root Certificate. This certificate is the one that Azure uses to secure the connection.

正如您在发布过程中所做的那样:

client.tls_set(ca_certs=path_to_root_cert,     
               certfile=None, keyfile=None, 
               cert_reqs=ssl.CERT_REQUIRED,       
               tls_version=ssl.PROTOCOL_TLSv1, 
               ciphers=None)

此外,您需要设置用户名和密码:

client.username_pw_set(username=iot_hub_name+".azure-devices.net/" + device_id, password=sas_token)

最后,您可以这样订阅

client.subscribe(topic=topic, qos=0)

安全密钥验证设备的示例代码:

import paho.mqtt.client as mqtt
import ssl, random
from time import sleep

path_to_root_cert = "./certs/digicertbaltimoreroot.cer"

iothub_name = "your hub name"
device_id = "device1"
sas_token = "SharedAccessSignature sr=[your hub name].azure-devices.net%2Fdevices%2Fdevice1&sig=[sig]&se=1526955728"
mqtt_url = "{}.azure-devices.net".format(iothub_name)
mqtt_port = 8883
topic = "devices/{}/messages/devicebound/#".format(device_id)

def error_str(rc):
    return "Some error occurred. {}: {}".format(rc, mqtt.error_string(rc))

def on_disconnect(unused_client, unused_userdata, rc):
    print("on_disconnect", error_str(rc))

def on_connect(client, userdata, flags, response_code):
    print("Connected with status: {0}".format(response_code))
    client.subscribe(topic, 1)

def on_subscribe(client, userdata, mid, granted_qos):
    print("Subscribed: "+str(mid)+" "+str(granted_qos))

def on_message(client, userdata, msg):
    print("Topic: {0} -- Payload: {1}".format(msg.topic, str(msg.payload)))

if __name__ == "__main__":    
    client = mqtt.Client(device_id, protocol=mqtt.MQTTv311)

    client.username_pw_set(username=iothub_name+".azure-devices.net/" + device_id, password=sas_token)

    client.tls_set(ca_certs=path_to_root_cert, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None)

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect


    print("Connecting to Azure IoT Hub...")
    client.connect(mqtt_url, mqtt_port, keepalive=60)
    client.subscribe(topic=topic, qos=0)
    client.loop_forever()

x509 认证设备的示例代码: (需要提供设备证书和私钥文件。不需要SAS token作为密码,留空即可。但仍需设置用户名。)

import paho.mqtt.client as mqtt
import ssl, random
from time import sleep

root_ca = "./certs/digicertbaltimoreroot.cer"
public_crt = './certs/mydevice-public.pem'
private_key = './certs/mydevice-private.pem'

iothub_name = "your hub name"
device_id = "mydevice"
mqtt_url = "{}.azure-devices.net".format(iothub_name)
mqtt_port = 8883
topic = "devices/{}/messages/devicebound/#".format(device_id)

def error_str(rc):
    return "Some error occurred. {}: {}".format(rc, mqtt.error_string(rc))

def on_disconnect(unused_client, unused_userdata, rc):
    print("on_disconnect", error_str(rc))

def on_connect(client, userdata, flags, response_code):
    print("Connected with status: {0}".format(response_code))
    client.subscribe(topic, 1)

def on_subscribe(client, userdata, mid, granted_qos):
    print("Subscribed: "+str(mid)+" "+str(granted_qos))

def on_message(client, userdata, msg):
    print("Topic: {0} -- Payload: {1}".format(msg.topic, str(msg.payload)))

if __name__ == "__main__":    
    client = mqtt.Client(device_id, protocol=mqtt.MQTTv311)

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect

    client.username_pw_set(username=iothub_name+".azure-devices.net/" + device_id, password="")

    client.tls_set(root_ca,
                   certfile = public_crt,
                   keyfile = private_key,
                   cert_reqs = ssl.CERT_REQUIRED,
                   tls_version = ssl.PROTOCOL_TLSv1_2,
                   ciphers = None)
    client.tls_insecure_set(False)


    print("Connecting to Azure IoT Hub...")
    client.connect(mqtt_url, mqtt_port, keepalive=60)
    client.subscribe(topic=topic, qos=0)
    client.loop_forever()

:题目是"devices/{device-id}/messages/devicebound/#".

如何创建设备证书文件可以参考here

更新: 这些示例代码用于接收 D2C messages 您可以像这样发送以进行简单测试:

来自device explorer

来自 Azure 门户:

更新:接收D2C消息:

正如我在评论中指出的那样,Azure IoT Hub 不是通用的 MQTT 兄弟,因此您不能直接订阅从设备发送的消息。 要解决此问题,您可以使用 IoT 中心公开的事件中心兼容端点,例如“Read the telemetry from your hub " 部分支持。但不幸的是,Python 不支持这种方式。您可以使用 iothub-explorer CLI 实用程序或创建一个 Node.js or a .NET or Java 基于事件中心的控制台应用程序来读取设备到-来自 IoT 中心的云消息。

正如Rita已经提到的,目前设备端不支持接收D2C消息。

Azure IoT hub 是 IoT 解决方案中的云网关,如下图所示:

Azure IoT 中心基于 endpoints. Currently it provides the device to cloud feature like D2C messages, Device Twin's reported properties and file upload. And from cloud we can use the cloud to device feature 类直接方法、Twin 的所需属性Cloud- 为支持的解决方案和设备端提供服务到设备消息.

I am trying to receive "notifications" from IoT Hub when some data is published to the MQTT topic devices/device_id/messages/events/

这取决于你想从哪一方接收信息。目前 Azure IoT 中心支持两种场景。

  1. 设备端发送Device-to-Cloud消息,云端接收消息(D2C消息仅支持AMQP协议)

  2. 云后端解决方案发送Could-to-device消息,设备端接收消息

    如果您对 Azure IoT 中心有任何想法或反馈,可以通过 Azure IoT Hub - user voice 提交。