Google IoT - 接收通知的正确模式(订阅工作)
Google IoT - Right mode to receive notifications (subscribe working)
我正在关注 this tutorial 并且我已经有我的代码将消息发布到 /devices/sm1/events
主题,其中 sm1
是我的设备 ID。
我想知道如何订阅这个主题,因为教程说要使用 /devices/sm1/config
,但我收到的是空消息。我已经尝试使用与发布 (/devices/sm1/events
) 相同的 "path",但它也没有用。
很奇怪,我给主题起的名字是 sm1
,而与我的设备关联的主题在 GoogleIoT 控制台上显示为 projects/myprojectname/topics/sm1
。因此,除了发现如何订阅上述主题外,我还感谢与在 GoogleIoT 中正确使用 pub/sub 主题相关的任何解释(文档不是很清楚)。
这是我的 subscribe.py
:
mqtt_url = "mqtt.googleapis.com"
mqtt_port = 8883
topic = "/devices/sm1/config"
def on_connect(client, userdata, flags, response_code):
print("Connected with status: {0}".format(response_code))
client.subscribe(topic, 1)
def on_message(client, userdata, msg):
print("Topic: {0} -- Payload: {1}".format(msg.topic, msg.payload))
if __name__ == "__main__":
client = mqtt.Client("projects/{}/locations/{}/registries/{}/devices/{}".format(
project_id,
cloud_region,
registry_id,
device_id))
client.username_pw_set(username='unused',
password=jwt_maker.create_jwt(project_id,
private_key,
algorithm="RS256"))
client.tls_set(root_ca,
certfile = public_crt,
keyfile = private_key,
cert_reqs = ssl.CERT_REQUIRED,
tls_version = ssl.PROTOCOL_TLSv1_2,
ciphers = None)
client.on_connect = on_connect
client.on_message = on_message
print "Connecting to Google IoT Broker..."
client.connect(mqtt_url, mqtt_port, keepalive=60)
client.loop_forever()
我的输出:
Connected with status: 0
Topic: /devices/sm1/config -- Payload:
Topic: /devices/sm1/config -- Payload:
编辑:根据下面的评论进行澄清...
这里有两个 GCP 组件在起作用。有 MQTT 主题(即 /events 主题),设备使用它与 IoT Core 对话。然后是 projects/myprojectname/topics/sm1
,它不在 IoT Core 中,它在 Pub/Sub 中。当您将消息发送到 /events MQTT 主题时,IoT Core 会代理从您的设备发送到 /events MQTT 主题的有效负载到创建并附加到您的设备所在的 IoT Core 注册表的 Pub/Sub 主题已注册。
要查看这些消息,您必须在 Pub/Sub 中创建主题 projects/myprojectname/topics/sm1
的订阅。如果您转到控制台,然后 Pub/Sub->topics。单击主题旁边的三个点和 select "New subscription"。订阅后,您可以从您的设备发送一些数据,然后在命令行上您可以 运行(假设您安装了 gcloud 工具):
gcloud beta pubsub subscriptions pull --max-messages=3 <subscription_id>
要对消息执行任何操作,您可以编写订阅 Pub/Sub 主题的脚本(查看 Pub/Sub API)以触发添加到主题的消息。
原始消息:
您要向设备发送配置消息吗?混淆可能是 MQTT 主题是 one-directional.
因此:1) /events 主题用于设备->IoT 核心。 2) /config 主题用于 IoT Core Admin SDK->device
在某个地方的另一个脚本中,或者从 IoT Core UI 界面,您需要发送配置消息才能看到 on_message 正确触发。
在 IoT Core UI(在 console.cloud.google.com 上),您可以深入查看已注册的单个设备,然后在屏幕顶部单击 "Update Config"。将出现一个弹出窗口 window,让您可以向该设备发送文本或 base64 编码的消息,它会进入 /config 主题。
阅读@GabeWeiss 的回答中评论部分的讨论后,我认为您对想要实现的目标以及您尝试使用的方式(或目的)有些困惑 Pub/Sub.
鉴于我认为这里的问题更概念化,让我首先向您介绍一个关于 Cloud IoT Core key concepts 的通用文档页面,您实际上可以在其中找到有关 Cloud IoT Core 和 Pub/Sub。总之,设备遥测数据被发布到 Cloud IoT Core 主题中,稍后通过 Data Broker 发布到 Cloud Pub/Sub 主题中,您可以在外部将其用于其他目的:触发 Cloud Functions,分析使用 Dataflow 等的数据流
现在,如果您关注 Quickstart guide of Cloud IoT Core, you will see how, at a given point, you create a device registry which is bound to a Pub/Sub topic where device telemetry events are published. If instead of writing to the default Pub/Sub topic you wish to write to multiple topics, you can follow the explanations under this other section on the documentation。
最后,关于订阅主题(Cloud IoT Core 主题,而不是 Pub/Sub 主题,因为只有前者与设备相关),您可以订阅 MQTT 主题 the following command,其中(例如)设备正在订阅配置更新发布的配置主题:
# This is the topic that the device will receive configuration updates on.
mqtt_config_topic = '/devices/{}/config'.format(device_id)
# Subscribe to the config topic.
client.subscribe(mqtt_config_topic, qos=1)
然后,使用 the on_message()
function,您可以处理在您实际订阅的主题上发布的消息。请注意,有效负载必须解析为字符串 (str(message.payload)
).
def on_message(unused_client, unused_userdata, message):
payload = str(message.payload)
print('Received message \'{}\' on topic \'{}\' with Qos {}'.format(
payload, message.topic, str(message.qos)))
然后,在您的问题中您声明您首先订阅了 /devices/{device-id}/config
,但这可能不是您想要的,因为这是发布配置更新的主题(即不是发布遥测事件的地方) ).然后,我了解到您应该订阅 /devices/{device-id}/events
,这是发布遥测指标的实际 MQTT 主题。如果这不起作用,则可能存在另一个相关问题,因此请确保您正确解析 message
变量,并可能尝试使用注册表创建 use Pub/Sub with the default topic 以检查遥测指标是否正确是否正确发布。
我不得不投降到 Google Pub/Sub 图书馆,以便接收与我指定主题相关的通知。
我的发布代码(仅重要部分):
mqtt_url = "mqtt.googleapis.com"
mqtt_port = 8883
mqtt_topic = "/devices/sm1/events"
client = mqtt.Client("projects/{}/locations/{}/registries/{}/devices/{}".format(
project_id,
cloud_region,
registry_id,
device_id), protocol=mqtt.MQTTv311)
client.username_pw_set(username='unused',
password=jwt_maker.create_jwt(project_id,
private_key,
algorithm="RS256"))
client.tls_set(root_ca,
certfile = public_crt,
keyfile = private_key,
cert_reqs = ssl.CERT_REQUIRED,
tls_version = ssl.PROTOCOL_TLSv1_2,
ciphers = None)
client.connect(mqtt_url, mqtt_port, keepalive=60)
res = client.publish(mqtt_topic, some_data, qos=1)
在 Google 云平台门户中,我必须在 Pub/Sub 部分创建一个订阅,将其分配给我创建的主题,这已经是我的默认主题(可能链接到 /事件主题)。创建的订阅具有以下格式:
projects/project_name/subscriptions/subscription_name
我的订阅代码,使用 Google Pub/Sub 库,因为无法使用 MQTT 协议:
from google.cloud import pubsub
def callback(message):
print(message.data)
message.ack()
project_id = "project_name"
subscription_name = "sm1"
subscriber = pubsub.SubscriberClient()
subscription_name = 'projects/{}/subscriptions/{}'.format(project_id, subscription_name)
subscription = subscriber.subscribe(subscription_name)
future = subscription.open(callback)
try:
future.result()
except Exception as ex:
subscription.close()
raise
我希望这可以帮助任何人。更多详情 can be found here(github).
我正在关注 this tutorial 并且我已经有我的代码将消息发布到 /devices/sm1/events
主题,其中 sm1
是我的设备 ID。
我想知道如何订阅这个主题,因为教程说要使用 /devices/sm1/config
,但我收到的是空消息。我已经尝试使用与发布 (/devices/sm1/events
) 相同的 "path",但它也没有用。
很奇怪,我给主题起的名字是 sm1
,而与我的设备关联的主题在 GoogleIoT 控制台上显示为 projects/myprojectname/topics/sm1
。因此,除了发现如何订阅上述主题外,我还感谢与在 GoogleIoT 中正确使用 pub/sub 主题相关的任何解释(文档不是很清楚)。
这是我的 subscribe.py
:
mqtt_url = "mqtt.googleapis.com"
mqtt_port = 8883
topic = "/devices/sm1/config"
def on_connect(client, userdata, flags, response_code):
print("Connected with status: {0}".format(response_code))
client.subscribe(topic, 1)
def on_message(client, userdata, msg):
print("Topic: {0} -- Payload: {1}".format(msg.topic, msg.payload))
if __name__ == "__main__":
client = mqtt.Client("projects/{}/locations/{}/registries/{}/devices/{}".format(
project_id,
cloud_region,
registry_id,
device_id))
client.username_pw_set(username='unused',
password=jwt_maker.create_jwt(project_id,
private_key,
algorithm="RS256"))
client.tls_set(root_ca,
certfile = public_crt,
keyfile = private_key,
cert_reqs = ssl.CERT_REQUIRED,
tls_version = ssl.PROTOCOL_TLSv1_2,
ciphers = None)
client.on_connect = on_connect
client.on_message = on_message
print "Connecting to Google IoT Broker..."
client.connect(mqtt_url, mqtt_port, keepalive=60)
client.loop_forever()
我的输出:
Connected with status: 0
Topic: /devices/sm1/config -- Payload:
Topic: /devices/sm1/config -- Payload:
编辑:根据下面的评论进行澄清...
这里有两个 GCP 组件在起作用。有 MQTT 主题(即 /events 主题),设备使用它与 IoT Core 对话。然后是 projects/myprojectname/topics/sm1
,它不在 IoT Core 中,它在 Pub/Sub 中。当您将消息发送到 /events MQTT 主题时,IoT Core 会代理从您的设备发送到 /events MQTT 主题的有效负载到创建并附加到您的设备所在的 IoT Core 注册表的 Pub/Sub 主题已注册。
要查看这些消息,您必须在 Pub/Sub 中创建主题 projects/myprojectname/topics/sm1
的订阅。如果您转到控制台,然后 Pub/Sub->topics。单击主题旁边的三个点和 select "New subscription"。订阅后,您可以从您的设备发送一些数据,然后在命令行上您可以 运行(假设您安装了 gcloud 工具):
gcloud beta pubsub subscriptions pull --max-messages=3 <subscription_id>
要对消息执行任何操作,您可以编写订阅 Pub/Sub 主题的脚本(查看 Pub/Sub API)以触发添加到主题的消息。
原始消息:
您要向设备发送配置消息吗?混淆可能是 MQTT 主题是 one-directional.
因此:1) /events 主题用于设备->IoT 核心。 2) /config 主题用于 IoT Core Admin SDK->device
在某个地方的另一个脚本中,或者从 IoT Core UI 界面,您需要发送配置消息才能看到 on_message 正确触发。
在 IoT Core UI(在 console.cloud.google.com 上),您可以深入查看已注册的单个设备,然后在屏幕顶部单击 "Update Config"。将出现一个弹出窗口 window,让您可以向该设备发送文本或 base64 编码的消息,它会进入 /config 主题。
阅读@GabeWeiss 的回答中评论部分的讨论后,我认为您对想要实现的目标以及您尝试使用的方式(或目的)有些困惑 Pub/Sub.
鉴于我认为这里的问题更概念化,让我首先向您介绍一个关于 Cloud IoT Core key concepts 的通用文档页面,您实际上可以在其中找到有关 Cloud IoT Core 和 Pub/Sub。总之,设备遥测数据被发布到 Cloud IoT Core 主题中,稍后通过 Data Broker 发布到 Cloud Pub/Sub 主题中,您可以在外部将其用于其他目的:触发 Cloud Functions,分析使用 Dataflow 等的数据流
现在,如果您关注 Quickstart guide of Cloud IoT Core, you will see how, at a given point, you create a device registry which is bound to a Pub/Sub topic where device telemetry events are published. If instead of writing to the default Pub/Sub topic you wish to write to multiple topics, you can follow the explanations under this other section on the documentation。
最后,关于订阅主题(Cloud IoT Core 主题,而不是 Pub/Sub 主题,因为只有前者与设备相关),您可以订阅 MQTT 主题 the following command,其中(例如)设备正在订阅配置更新发布的配置主题:
# This is the topic that the device will receive configuration updates on.
mqtt_config_topic = '/devices/{}/config'.format(device_id)
# Subscribe to the config topic.
client.subscribe(mqtt_config_topic, qos=1)
然后,使用 the on_message()
function,您可以处理在您实际订阅的主题上发布的消息。请注意,有效负载必须解析为字符串 (str(message.payload)
).
def on_message(unused_client, unused_userdata, message):
payload = str(message.payload)
print('Received message \'{}\' on topic \'{}\' with Qos {}'.format(
payload, message.topic, str(message.qos)))
然后,在您的问题中您声明您首先订阅了 /devices/{device-id}/config
,但这可能不是您想要的,因为这是发布配置更新的主题(即不是发布遥测事件的地方) ).然后,我了解到您应该订阅 /devices/{device-id}/events
,这是发布遥测指标的实际 MQTT 主题。如果这不起作用,则可能存在另一个相关问题,因此请确保您正确解析 message
变量,并可能尝试使用注册表创建 use Pub/Sub with the default topic 以检查遥测指标是否正确是否正确发布。
我不得不投降到 Google Pub/Sub 图书馆,以便接收与我指定主题相关的通知。
我的发布代码(仅重要部分):
mqtt_url = "mqtt.googleapis.com"
mqtt_port = 8883
mqtt_topic = "/devices/sm1/events"
client = mqtt.Client("projects/{}/locations/{}/registries/{}/devices/{}".format(
project_id,
cloud_region,
registry_id,
device_id), protocol=mqtt.MQTTv311)
client.username_pw_set(username='unused',
password=jwt_maker.create_jwt(project_id,
private_key,
algorithm="RS256"))
client.tls_set(root_ca,
certfile = public_crt,
keyfile = private_key,
cert_reqs = ssl.CERT_REQUIRED,
tls_version = ssl.PROTOCOL_TLSv1_2,
ciphers = None)
client.connect(mqtt_url, mqtt_port, keepalive=60)
res = client.publish(mqtt_topic, some_data, qos=1)
在 Google 云平台门户中,我必须在 Pub/Sub 部分创建一个订阅,将其分配给我创建的主题,这已经是我的默认主题(可能链接到 /事件主题)。创建的订阅具有以下格式:
projects/project_name/subscriptions/subscription_name
我的订阅代码,使用 Google Pub/Sub 库,因为无法使用 MQTT 协议:
from google.cloud import pubsub
def callback(message):
print(message.data)
message.ack()
project_id = "project_name"
subscription_name = "sm1"
subscriber = pubsub.SubscriberClient()
subscription_name = 'projects/{}/subscriptions/{}'.format(project_id, subscription_name)
subscription = subscriber.subscribe(subscription_name)
future = subscription.open(callback)
try:
future.result()
except Exception as ex:
subscription.close()
raise
我希望这可以帮助任何人。更多详情 can be found here(github).