Google IoT - 接收通知的正确模式(订阅工作)

Google IoT - Right mode to receive notifications (subscribe working)

我正在关注 this tutorial 并且我已经有我的代码将消息发布到 /devices/sm1/events 主题,其中 sm1 是我的设备 ID。

我想知道如何订阅这个主题,因为教程说要使用 /devices/sm1/config,但我收到的是空消息。我已经尝试使用与发布 (/devices/sm1/events) 相同的 "path",但它也没有用。

很奇怪,我给主题起的名字是 sm1,而与我的设备关联的主题在 GoogleIoT 控制台上显示为 projects/myprojectname/topics/sm1。因此,除了发现如何订阅上述主题外,我还感谢与在 GoogleIoT 中正确使用 pub/sub 主题相关的任何解释(文档不是很清楚)。

这是我的 subscribe.py:

mqtt_url = "mqtt.googleapis.com"
mqtt_port = 8883
topic = "/devices/sm1/config"

def on_connect(client, userdata, flags, response_code):
    print("Connected with status: {0}".format(response_code))
    client.subscribe(topic, 1)

def on_message(client, userdata, msg):
    print("Topic: {0}  --  Payload: {1}".format(msg.topic, msg.payload))

if __name__ == "__main__":    
    client = mqtt.Client("projects/{}/locations/{}/registries/{}/devices/{}".format(
                         project_id,
                         cloud_region,
                         registry_id,
                         device_id))

    client.username_pw_set(username='unused',
                           password=jwt_maker.create_jwt(project_id,
                                               private_key,
                                               algorithm="RS256"))

    client.tls_set(root_ca,
                   certfile = public_crt,
                   keyfile = private_key,
                   cert_reqs = ssl.CERT_REQUIRED,
                   tls_version = ssl.PROTOCOL_TLSv1_2,
                   ciphers = None)


    client.on_connect = on_connect
    client.on_message = on_message

    print "Connecting to Google IoT Broker..."
    client.connect(mqtt_url, mqtt_port, keepalive=60)
    client.loop_forever()

我的输出:

Connected with status: 0
Topic: /devices/sm1/config -- Payload:
Topic: /devices/sm1/config -- Payload:

编辑:根据下面的评论进行澄清...

这里有两个 GCP 组件在起作用。有 MQTT 主题(即 /events 主题),设备使用它与 IoT Core 对话。然后是 projects/myprojectname/topics/sm1,它不在 IoT Core 中,它在 Pub/Sub 中。当您将消息发送到 /events MQTT 主题时,IoT Core 会代理从您的设备发送到 /events MQTT 主题的有效负载到创建并附加到您的设备所在的 IoT Core 注册表的 Pub/Sub 主题已注册。

要查看这些消息,您必须在 Pub/Sub 中创建主题 projects/myprojectname/topics/sm1 的订阅。如果您转到控制台,然后 Pub/Sub->topics。单击主题旁边的三个点和 select "New subscription"。订阅后,您可以从您的设备发送一些数据,然后在命令行上您可以 运行(假设您安装了 gcloud 工具):

gcloud beta pubsub subscriptions pull --max-messages=3 <subscription_id>

要对消息执行任何操作,您可以编写订阅 Pub/Sub 主题的脚本(查看 Pub/Sub API)以触发添加到主题的消息。

原始消息:

您要向设备发送配置消息吗?混淆可能是 MQTT 主题是 one-directional.

因此:1) /events 主题用于设备->IoT 核心。 2) /config 主题用于 IoT Core Admin SDK->device

在某个地方的另一个脚本中,或者从 IoT Core UI 界面,您需要发送配置消息才能看到 on_message 正确触发。

在 IoT Core UI(在 console.cloud.google.com 上),您可以深入查看已注册的单个设备,然后在屏幕顶部单击 "Update Config"。将出现一个弹出窗口 window,让您可以向该设备发送文本或 base64 编码的消息,它会进入 /config 主题。

阅读@GabeWeiss 的回答中评论部分的讨论后,我认为您对想要实现的目标以及您尝试使用的方式(或目的)有些困惑 Pub/Sub.

鉴于我认为这里的问题更概念化,让我首先向您介绍一个关于 Cloud IoT Core key concepts 的通用文档页面,您实际上可以在其中找到有关 Cloud IoT Core 和 Pub/Sub。总之,设备遥测数据被发布到 Cloud IoT Core 主题中,稍后通过 Data Broker 发布到 Cloud Pub/Sub 主题中,您可以在外部将其用于其他目的:触发 Cloud Functions,分析使用 Dataflow 等的数据流

现在,如果您关注 Quickstart guide of Cloud IoT Core, you will see how, at a given point, you create a device registry which is bound to a Pub/Sub topic where device telemetry events are published. If instead of writing to the default Pub/Sub topic you wish to write to multiple topics, you can follow the explanations under this other section on the documentation

最后,关于订阅主题(Cloud IoT Core 主题,而不是 Pub/Sub 主题,因为只有前者与设备相关),您可以订阅 MQTT 主题 the following command,其中(例如)设备正在订阅配置更新发布的配置主题:

# This is the topic that the device will receive configuration updates on.
mqtt_config_topic = '/devices/{}/config'.format(device_id)

# Subscribe to the config topic.
client.subscribe(mqtt_config_topic, qos=1)

然后,使用 the on_message() function,您可以处理在您实际订阅的主题上发布的消息。请注意,有效负载必须解析为字符串 (str(message.payload)).

def on_message(unused_client, unused_userdata, message):
    payload = str(message.payload)
    print('Received message \'{}\' on topic \'{}\' with Qos {}'.format(
            payload, message.topic, str(message.qos)))

然后,在您的问题中您声明您首先订阅了 /devices/{device-id}/config,但这可能不是您想要的,因为这是发布配置更新的主题(即不是发布遥测事件的地方) ).然后,我了解到您应该订阅 /devices/{device-id}/events,这是发布遥测指标的实际 MQTT 主题。如果这不起作用,则可能存在另一个相关问题,因此请确保您正确解析 message 变量,并可能尝试使用注册表创建 use Pub/Sub with the default topic 以检查遥测指标是否正确是否正确发布。

我不得不投降到 Google Pub/Sub 图书馆,以便接收与我指定主题相关的通知。

我的发布代码(仅重要部分):

mqtt_url = "mqtt.googleapis.com"
mqtt_port = 8883
mqtt_topic = "/devices/sm1/events"

client = mqtt.Client("projects/{}/locations/{}/registries/{}/devices/{}".format(
                     project_id,
                     cloud_region,
                     registry_id,
                     device_id), protocol=mqtt.MQTTv311)

client.username_pw_set(username='unused',
                       password=jwt_maker.create_jwt(project_id,
                                           private_key,
                                           algorithm="RS256"))

client.tls_set(root_ca, 
               certfile = public_crt, 
               keyfile = private_key, 
               cert_reqs = ssl.CERT_REQUIRED, 
               tls_version = ssl.PROTOCOL_TLSv1_2, 
               ciphers = None)

client.connect(mqtt_url, mqtt_port, keepalive=60)
res = client.publish(mqtt_topic, some_data, qos=1)

在 Google 云平台门户中,我必须在 Pub/Sub 部分创建一个订阅,将其分配给我创建的主题,这已经是我的默认主题(可能链接到 /事件主题)。创建的订阅具有以下格式:

projects/project_name/subscriptions/subscription_name

我的订阅代码,使用 Google Pub/Sub 库,因为无法使用 MQTT 协议:

from google.cloud import pubsub

def callback(message):
    print(message.data)
    message.ack()

project_id = "project_name"
subscription_name = "sm1"

subscriber = pubsub.SubscriberClient()
subscription_name = 'projects/{}/subscriptions/{}'.format(project_id, subscription_name)

subscription = subscriber.subscribe(subscription_name)
future = subscription.open(callback)

try:
    future.result()
except Exception as ex:
    subscription.close()
    raise

我希望这可以帮助任何人。更多详情 can be found here(github).