Azure IOT 模块 - 无法订阅消息
Azure IOT Module - Unable to subscribe to message
我正在尝试更轻松地使用 Azure IOT,并决定开始发送我自己的遥测数据,而不是他们提供的 "tempSensor" 演示。
我创建了这个 "basic_device" 模块。
import time
import json
import iothub_client
# pylint: disable=E0611
from iothub_client import IoTHubClient, IoTHubTransportProvider, IoTHubMessage, IoTHubError
# Connection string and protocol set
CONNECTION_STRING = "REMOVED-FOR-POST"
PROTOCOL = IoTHubTransportProvider.MQTT
def set_sample_rate(sample_rate=0.1):
"""Creates a decorator that has the given sample_rate."""
def decorate_sample_rate(func):
"""The actual decorator."""
def wrapper(*args, **kwargs):
"""Wrapper method."""
fname = func.__name__
# If first time, use the last time
if fname not in args[0]._time:
args[0]._time[fname] = args[0]._time_last
# Check if it's time to add this message
if args[0]._time_last - args[0]._time[fname] >= sample_rate:
component_msg = func(*args, **kwargs)
for comp in component_msg:
args[1][comp] = component_msg[comp]
args[0]._time[fname] = args[0]._time_last
return args[1]
return wrapper
return decorate_sample_rate
def send_confirmation_callback(message, result, user_context):
"""Send confirmation upon sucessful message."""
print ( "IoT Hub responded to message with status: %s" % (result) )
class SimulateDevice():
def __init__(self, msg_max=100):
self._client = IoTHubClient(CONNECTION_STRING, PROTOCOL)
self._time_start = time.time()
self._time_last = time.time()
self._time = {}
self._msg_count = 0
self._msg_max = msg_max
@set_sample_rate(2)
def _noisy_wave_message(self, msg):
"""Create a message that is noisy."""
dt = time.time() - self._time_start
component_msg = {
'noisy_sinus': np.sin(dt / 100.0) + np.random.normal(0, 0.25, 1)[0],
'noisy_cosine': np.cos(dt / 100.0) + np.random.normal(0, 0.25, 1)[0],
}
return component_msg
def send_message(self):
"""Send a message."""
msg = self._noisy_wave_message({})
if msg and self._msg_count <= self._msg_max:
msg['timeStamp'] = self._time_last
msg_body = json.dumps(msg)
print("Sending message: %s" % msg_body)
iot_msg = IoTHubMessage(msg_body)
iot_msg.message_id = "message_%d" % self._msg_count
self._client.send_event_async(iot_msg, send_confirmation_callback, None)
self._msg_count +=1
self._time_last = time.time()
def start_device():
try:
device = SimulateDevice()
print ( "IoT Hub device sending periodic messages, press Ctrl-C to exit" )
while True:
# Send the message.
device.send_message()
except IoTHubError as iothub_error:
print ( "Unexpected error %s from IoTHub" % iothub_error )
return
except KeyboardInterrupt:
print ( "IoTHubClient sample stopped" )
if __name__ == '__main__':
print ( "IoT Hub Quickstart #1 - Simulated device" )
print ( "Press Ctrl-C to exit" )
start_device()
当我查看日志时,我看到了这个
azureuser@EdgeVM:~$ sudo iotedge logs basic_device
IoT Hub Quickstart #1 - Simulated device
Press Ctrl-C to exit
IoT Hub device sending periodic messages, press Ctrl-C to exit
Sending message: {"noisy_sinus": -0.12927878622262406, "noisy_cosine": 0.5951663552778992, "timeStamp": 1542717185.0867708}
IoT Hub responded to message with status: OK
这么说好像没问题吧?
然后我想创建一个订阅来自该模块的消息的模块。模块比较简单,即
import random
import time
import sys
import iothub_client
# pylint: disable=E0611
from iothub_client import IoTHubModuleClient, IoTHubTransportProvider
from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError
MESSAGE_TIMEOUT = 10000
PROTOCOL = IoTHubTransportProvider.MQTT
def noisy_sinus(message, user_context):
print("Received a message")
print(message)
return IoTHubMessageDispositionResult.ACCEPTED
class AnalysisManager():
"""A class that manages different analysis for differnet signals."""
def __init__(self, protocol=IoTHubTransportProvider.MQTT):
self.client_protocol = protocol
self.client = IoTHubModuleClient()
self.client.create_from_environment(protocol)
# set the time until a message times out
self.client.set_option("messageTimeout", MESSAGE_TIMEOUT)
# sets the callback when a message arrives on "input1" queue.
self.client.set_message_callback("input1", noisy_sinus, None)
def main(protocol):
try:
print ( "\nPython %s\n" % sys.version )
print ( "IoT Hub Client for Python" )
hub_manager = AnalysisManager(protocol)
print ( "Starting the IoT Hub Python sample using protocol %s..." % hub_manager.client_protocol )
print ( "The sample is now waiting for messages and will indefinitely. Press Ctrl-C to exit. ")
while True:
time.sleep(1)
except IoTHubError as iothub_error:
print ( "Unexpected error %s from IoTHub" % iothub_error )
return
except KeyboardInterrupt:
print ( "IoTHubModuleClient sample stopped" )
if __name__ == '__main__':
main(PROTOCOL)
并且在路由设置中,我有以下内容
"routes": {
"basic_deviceToIoTHub": "FROM /messages/modules/basic_device/outputs/* INTO $upstream",
"basic_analysisToIoTHub": "FROM /messages/modules/basic_analysis/outputs/* INTO $upstream",
"sensorTobasic_analysis": "FROM /messages/modules/basic_device/outputs/* INTO BrokeredEndpoint(\"/modules/basic_analysis/inputs/input1\")"
},
但是这个 根本没有收到 条消息。
azureuser@EdgeVM:~$ sudo iotedge logs basic_analysis
Python 3.5.2 (default, Nov 23 2017, 16:37:01)
[GCC 5.4.0 20160609]
IoT Hub Client for Python
Starting the IoT Hub Python sample using protocol MQTT...
The sample is now waiting for messages and will indefinitely. Press Ctrl-C to exit.
我在这里错过了什么?如果我也愿意,我也可以从 tempSensor 模块接收消息。
此外,在demo中,可以将消息命名为temperatureOutput,但是在PythonAPI中,并没有这样的创建 send_event_async.
时的选项
好吧,经过一番挖掘,我找到了一个有效的解决方案。
在 basic_device 模块中,我现在使用 IoTHubModuleClient
而不是 IoTHubClient
。
class SimulateDevice():
def __init__(self, protocol, msg_max=100):
self._client_protocol = protocol
self._client = IoTHubModuleClient()
self._client.create_from_environment(self._client_protocol)
然后允许我像这样发送消息
self._client.send_event_async("allmessages", iot_msg, send_confirmation_callback, None)
然后在路由中,我可以使用
"routes": {
"basic_deviceToIoTHub": "FROM /messages/modules/basic_device/outputs/* INTO $upstream",
"basic_analysisToIoTHub": "FROM /messages/modules/basic_analysis/outputs/* INTO $upstream",
"sensorTobasic_analysis": "FROM /messages/modules/basic_device/outputs/allmessages INTO BrokeredEndpoint(\"/modules/basic_analysis/inputs/input1\")"
},
那么分析模块就可以接收到消息了。我不确定为什么在前一个案例中使用“*”不起作用。
编辑:修正语法
我正在尝试更轻松地使用 Azure IOT,并决定开始发送我自己的遥测数据,而不是他们提供的 "tempSensor" 演示。
我创建了这个 "basic_device" 模块。
import time
import json
import iothub_client
# pylint: disable=E0611
from iothub_client import IoTHubClient, IoTHubTransportProvider, IoTHubMessage, IoTHubError
# Connection string and protocol set
CONNECTION_STRING = "REMOVED-FOR-POST"
PROTOCOL = IoTHubTransportProvider.MQTT
def set_sample_rate(sample_rate=0.1):
"""Creates a decorator that has the given sample_rate."""
def decorate_sample_rate(func):
"""The actual decorator."""
def wrapper(*args, **kwargs):
"""Wrapper method."""
fname = func.__name__
# If first time, use the last time
if fname not in args[0]._time:
args[0]._time[fname] = args[0]._time_last
# Check if it's time to add this message
if args[0]._time_last - args[0]._time[fname] >= sample_rate:
component_msg = func(*args, **kwargs)
for comp in component_msg:
args[1][comp] = component_msg[comp]
args[0]._time[fname] = args[0]._time_last
return args[1]
return wrapper
return decorate_sample_rate
def send_confirmation_callback(message, result, user_context):
"""Send confirmation upon sucessful message."""
print ( "IoT Hub responded to message with status: %s" % (result) )
class SimulateDevice():
def __init__(self, msg_max=100):
self._client = IoTHubClient(CONNECTION_STRING, PROTOCOL)
self._time_start = time.time()
self._time_last = time.time()
self._time = {}
self._msg_count = 0
self._msg_max = msg_max
@set_sample_rate(2)
def _noisy_wave_message(self, msg):
"""Create a message that is noisy."""
dt = time.time() - self._time_start
component_msg = {
'noisy_sinus': np.sin(dt / 100.0) + np.random.normal(0, 0.25, 1)[0],
'noisy_cosine': np.cos(dt / 100.0) + np.random.normal(0, 0.25, 1)[0],
}
return component_msg
def send_message(self):
"""Send a message."""
msg = self._noisy_wave_message({})
if msg and self._msg_count <= self._msg_max:
msg['timeStamp'] = self._time_last
msg_body = json.dumps(msg)
print("Sending message: %s" % msg_body)
iot_msg = IoTHubMessage(msg_body)
iot_msg.message_id = "message_%d" % self._msg_count
self._client.send_event_async(iot_msg, send_confirmation_callback, None)
self._msg_count +=1
self._time_last = time.time()
def start_device():
try:
device = SimulateDevice()
print ( "IoT Hub device sending periodic messages, press Ctrl-C to exit" )
while True:
# Send the message.
device.send_message()
except IoTHubError as iothub_error:
print ( "Unexpected error %s from IoTHub" % iothub_error )
return
except KeyboardInterrupt:
print ( "IoTHubClient sample stopped" )
if __name__ == '__main__':
print ( "IoT Hub Quickstart #1 - Simulated device" )
print ( "Press Ctrl-C to exit" )
start_device()
当我查看日志时,我看到了这个
azureuser@EdgeVM:~$ sudo iotedge logs basic_device
IoT Hub Quickstart #1 - Simulated device
Press Ctrl-C to exit
IoT Hub device sending periodic messages, press Ctrl-C to exit
Sending message: {"noisy_sinus": -0.12927878622262406, "noisy_cosine": 0.5951663552778992, "timeStamp": 1542717185.0867708}
IoT Hub responded to message with status: OK
这么说好像没问题吧?
然后我想创建一个订阅来自该模块的消息的模块。模块比较简单,即
import random
import time
import sys
import iothub_client
# pylint: disable=E0611
from iothub_client import IoTHubModuleClient, IoTHubTransportProvider
from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError
MESSAGE_TIMEOUT = 10000
PROTOCOL = IoTHubTransportProvider.MQTT
def noisy_sinus(message, user_context):
print("Received a message")
print(message)
return IoTHubMessageDispositionResult.ACCEPTED
class AnalysisManager():
"""A class that manages different analysis for differnet signals."""
def __init__(self, protocol=IoTHubTransportProvider.MQTT):
self.client_protocol = protocol
self.client = IoTHubModuleClient()
self.client.create_from_environment(protocol)
# set the time until a message times out
self.client.set_option("messageTimeout", MESSAGE_TIMEOUT)
# sets the callback when a message arrives on "input1" queue.
self.client.set_message_callback("input1", noisy_sinus, None)
def main(protocol):
try:
print ( "\nPython %s\n" % sys.version )
print ( "IoT Hub Client for Python" )
hub_manager = AnalysisManager(protocol)
print ( "Starting the IoT Hub Python sample using protocol %s..." % hub_manager.client_protocol )
print ( "The sample is now waiting for messages and will indefinitely. Press Ctrl-C to exit. ")
while True:
time.sleep(1)
except IoTHubError as iothub_error:
print ( "Unexpected error %s from IoTHub" % iothub_error )
return
except KeyboardInterrupt:
print ( "IoTHubModuleClient sample stopped" )
if __name__ == '__main__':
main(PROTOCOL)
并且在路由设置中,我有以下内容
"routes": {
"basic_deviceToIoTHub": "FROM /messages/modules/basic_device/outputs/* INTO $upstream",
"basic_analysisToIoTHub": "FROM /messages/modules/basic_analysis/outputs/* INTO $upstream",
"sensorTobasic_analysis": "FROM /messages/modules/basic_device/outputs/* INTO BrokeredEndpoint(\"/modules/basic_analysis/inputs/input1\")"
},
但是这个 根本没有收到 条消息。
azureuser@EdgeVM:~$ sudo iotedge logs basic_analysis
Python 3.5.2 (default, Nov 23 2017, 16:37:01)
[GCC 5.4.0 20160609]
IoT Hub Client for Python
Starting the IoT Hub Python sample using protocol MQTT...
The sample is now waiting for messages and will indefinitely. Press Ctrl-C to exit.
我在这里错过了什么?如果我也愿意,我也可以从 tempSensor 模块接收消息。
此外,在demo中,可以将消息命名为temperatureOutput,但是在PythonAPI中,并没有这样的创建 send_event_async.
时的选项好吧,经过一番挖掘,我找到了一个有效的解决方案。
在 basic_device 模块中,我现在使用 IoTHubModuleClient
而不是 IoTHubClient
。
class SimulateDevice():
def __init__(self, protocol, msg_max=100):
self._client_protocol = protocol
self._client = IoTHubModuleClient()
self._client.create_from_environment(self._client_protocol)
然后允许我像这样发送消息
self._client.send_event_async("allmessages", iot_msg, send_confirmation_callback, None)
然后在路由中,我可以使用
"routes": {
"basic_deviceToIoTHub": "FROM /messages/modules/basic_device/outputs/* INTO $upstream",
"basic_analysisToIoTHub": "FROM /messages/modules/basic_analysis/outputs/* INTO $upstream",
"sensorTobasic_analysis": "FROM /messages/modules/basic_device/outputs/allmessages INTO BrokeredEndpoint(\"/modules/basic_analysis/inputs/input1\")"
},
那么分析模块就可以接收到消息了。我不确定为什么在前一个案例中使用“*”不起作用。
编辑:修正语法