AWS IoT SDK Python subscribe returns true 但未调用回调参数
AWS IoT SDK Python subscribe returns true but callback parameter is not being called
关于我正在做的事情的一些概述是将传感器数据从 Raspbery Pi 发布到 AWS,AWS 将数据存储到 DynamoDB 并调用 lambda 函数。然后,此 lambda 函数向 raspberry Pi 订阅的主题发布消息。
所以我的问题是没有调用回调,所以我无法访问从 AWS lambda 发布的消息。我验证了此消息正在发布到 RaspberryPi on AWSIoT 测试订阅的主题。我在 raspberry Pi 上使用 AWSIoTPythonSDK 库,在 AWS lambda 函数上使用 Boto3。
此外,我已经阅读了使用 AWS IoT shadow 的可能解决方案,但这个解决方案已经接近完成 - 我不想放弃我的努力,因为它似乎是一行代码,但不是在职的。
请告诉我有关如何进一步解决此问题的任何想法。
到目前为止,我已经尝试在订阅函数之后打印堆栈,它从堆栈输出:* 我没有让整个循环完成*
pi@raspberrypi:~/eve-pi $ pi@raspberrypi:~/eve-pi $ python3 sensor_random.py
for line in traceback.format_stack():
File "sensor_random.py", line 66, in <module>
for line in traceback.format_stack():
^CTraceback (most recent call last):
File "sensor_random.py", line 68, in <module>
time.sleep(2)
KeyboardInterrupt
-bash: pi@raspberrypi:~/eve-pi: No such file or directory
这里是 Raspberry Pi 代码:*****省略发布代码*********
import json
import time
import pytz
import traceback
import inspect
from time import sleep
from datetime import date, datetime
from random import randint
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
# AWS IoT certificate based connection
# MQQT client is an ID so that the MQTT broker can identify the client
myMQTTClient = AWSIoTMQTTClient("XXXXXXXX")
# this is the unique thing endpoint with the .503 certificate
myMQTTClient.configureEndpoint("XXXXXXXXX.us-west-2.amazonaws.com", 8883)
myMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
myMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz
myMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec
myMQTTClient.configureMQTTOperationTimeout(20) # 5 sec
def customCallback(client, userdata, message):
traceback.print_stack()
print('in callback 1')
print(message.payload)
print('in callback 2')
def rand_sensor_data():
print('randomizing sensor data')
for each in payload:
each = randint(1, 51)
try:
rand_sensor_data()
print(payload)
msg = json.dumps(payload)
myMQTTClient.publish("thing01/data", msg, 0)
print('before subscribe')
for x in range(5):
myMQTTClient.subscribe("thing02/water", 0, customCallback)
for line in traceback.format_stack():
print(line.strip())
time.sleep(2)
print('after subscribe')
except KeyboardInterrupt:
GPIO.cleanup()
print('exited')
这是 AWS lambda 代码:
import json
import boto3
def lambda_handler(event, context):
#testing for pi publishing
message = {
'topic': 'thing02/water',
'payload': {'message': 'test'}
}
boto3.client(
'iot-data',
region_name='us-west-2',
aws_access_key_id='<access-key>',
aws_secret_access_key='<secret-access-key'
).publish(
topic='thing02/water',
payload=json.dumps(message),
qos=1
)
print(json.dumps(message))
首先,围绕订阅的循环没有意义,因为 x
从未被使用过,您应该只订阅一次主题。 MQTT 客户端不会在每次调用订阅时轮询主题,它会通知代理它想要所有匹配的消息,然后只是坐下来等待代理发送匹配的消息,直到您取消订阅或断开连接。
您应该将订阅移动到发布之前,然后在发布之前设置并等待响应消息,这消除了客户端丢失消息的任何可能性,因为它仍在尝试处理订阅设置。
关于我正在做的事情的一些概述是将传感器数据从 Raspbery Pi 发布到 AWS,AWS 将数据存储到 DynamoDB 并调用 lambda 函数。然后,此 lambda 函数向 raspberry Pi 订阅的主题发布消息。
所以我的问题是没有调用回调,所以我无法访问从 AWS lambda 发布的消息。我验证了此消息正在发布到 RaspberryPi on AWSIoT 测试订阅的主题。我在 raspberry Pi 上使用 AWSIoTPythonSDK 库,在 AWS lambda 函数上使用 Boto3。
此外,我已经阅读了使用 AWS IoT shadow 的可能解决方案,但这个解决方案已经接近完成 - 我不想放弃我的努力,因为它似乎是一行代码,但不是在职的。
请告诉我有关如何进一步解决此问题的任何想法。
到目前为止,我已经尝试在订阅函数之后打印堆栈,它从堆栈输出:* 我没有让整个循环完成*
pi@raspberrypi:~/eve-pi $ pi@raspberrypi:~/eve-pi $ python3 sensor_random.py
for line in traceback.format_stack():
File "sensor_random.py", line 66, in <module>
for line in traceback.format_stack():
^CTraceback (most recent call last):
File "sensor_random.py", line 68, in <module>
time.sleep(2)
KeyboardInterrupt
-bash: pi@raspberrypi:~/eve-pi: No such file or directory
这里是 Raspberry Pi 代码:*****省略发布代码*********
import json
import time
import pytz
import traceback
import inspect
from time import sleep
from datetime import date, datetime
from random import randint
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
# AWS IoT certificate based connection
# MQQT client is an ID so that the MQTT broker can identify the client
myMQTTClient = AWSIoTMQTTClient("XXXXXXXX")
# this is the unique thing endpoint with the .503 certificate
myMQTTClient.configureEndpoint("XXXXXXXXX.us-west-2.amazonaws.com", 8883)
myMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
myMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz
myMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec
myMQTTClient.configureMQTTOperationTimeout(20) # 5 sec
def customCallback(client, userdata, message):
traceback.print_stack()
print('in callback 1')
print(message.payload)
print('in callback 2')
def rand_sensor_data():
print('randomizing sensor data')
for each in payload:
each = randint(1, 51)
try:
rand_sensor_data()
print(payload)
msg = json.dumps(payload)
myMQTTClient.publish("thing01/data", msg, 0)
print('before subscribe')
for x in range(5):
myMQTTClient.subscribe("thing02/water", 0, customCallback)
for line in traceback.format_stack():
print(line.strip())
time.sleep(2)
print('after subscribe')
except KeyboardInterrupt:
GPIO.cleanup()
print('exited')
这是 AWS lambda 代码:
import json
import boto3
def lambda_handler(event, context):
#testing for pi publishing
message = {
'topic': 'thing02/water',
'payload': {'message': 'test'}
}
boto3.client(
'iot-data',
region_name='us-west-2',
aws_access_key_id='<access-key>',
aws_secret_access_key='<secret-access-key'
).publish(
topic='thing02/water',
payload=json.dumps(message),
qos=1
)
print(json.dumps(message))
首先,围绕订阅的循环没有意义,因为 x
从未被使用过,您应该只订阅一次主题。 MQTT 客户端不会在每次调用订阅时轮询主题,它会通知代理它想要所有匹配的消息,然后只是坐下来等待代理发送匹配的消息,直到您取消订阅或断开连接。
您应该将订阅移动到发布之前,然后在发布之前设置并等待响应消息,这消除了客户端丢失消息的任何可能性,因为它仍在尝试处理订阅设置。