AWS IoT Python 设备 SDK 影子更新超时
AWS IoT Python Device SDK Shadow Update Timeouts
一段时间以来,我一直在为我们的设备开发 AWS IoT 设备影子更新。我们在 Linux(由 Yocto/Bitbake 制作)和 Python 2.7.3(其中是我们的硬件供应商为我们用于 ARM 板的 Yocto 提供的唯一可用版本。
我从 AWS 存储库下载了示例影子更新代码,对其进行了修改以满足我们的需求,并将其放入我们的设备构建中。我会说,它在大多数情况下都有效,但它的失败率仍然让我感到不舒服。请注意,在此代码中,我添加了 "OfflinePublishQueueing" 设置来处理设备连接需要一段时间的情况。这是一个网站推荐的,以防止我们遇到 "PublishQueueDisabled" 错误。这是初始化代码。我会补充一点,我知道我们使用的证书是好的,否则永远不会成功。
self.AWSIoTMQTTShadowClient = AWSIoTMQTTShadowClient("InCoIoTShadowUpdate")
self.AWSIoTMQTTShadowClient.configureEndpoint(endpoint, 8883)
self.AWSIoTMQTTShadowClient.configureCredentials(rootCAPath, privateKeyFile,
jointCertificateFile)
# AWSIoTMQTTShadowClient configuration
self.AWSIoTMQTTShadowClient.configureAutoReconnectBackoffTime(1, 32, 20)
self.AWSIoTMQTTShadowClient.configureConnectDisconnectTimeout(10) # 10 sec
self.AWSIoTMQTTShadowClient.configureMQTTOperationTimeout(15) # 15 sec
MQTTClient = self.AWSIoTMQTTShadowClient.getMQTTConnection()
MQTTClient.configureOfflinePublishQueueing(5, DROP_OLDEST)
稍后更新影子的代码,注意 ThingId 是在本地设备配置的其他地方设置的。
def ConnectAndUpdate(self):
deviceState = InovaIoTDeviceState()
log = logging.getLogger("InovaIoTDeviceClient:connectAndUpdate")
# Connect and subscribe to AWS IoT
try:
self.AWSIoTMQTTShadowClient.connect()
except (connectError, connectTimeoutException):
log.error("Error connecting to AWS IoT service")
return False
# Create a deviceShadow with persistent subscription
updateBot = self.AWSIoTMQTTShadowClient.createShadowHandlerWithName(self.ThingId, True)
JSONPayload = deviceState.GetDeviceShadowDocument()
try:
updateBot.shadowUpdate(JSONPayload, self.ShadowUpdateCallback, 15)
# This is bad if these errors are thrown, probably either an initial device registration failure
# or global problem with Inova AWS IoT console configuration or lambda function
except (publishError, subscribeError):
log.error('Publish or subscribe error..')
return False
except (publishTimeoutException, subscribeTimeoutException):
# It is possible we are here due to a temporary snafu in AWS
log.error('Publish or subscribe timeout..')
return False
except publishQueueDisabledException:
# From time to time, AWS will randomly disconnect.
log.error('Publish Queue disabled..')
return False
while not self.ResponseReceived:
time.sleep(1)
# reset state
self.ResponseReceived = False
try:
self.AWSIoTMQTTShadowClient.disconnect()
except (disconnectError, disconnectTimeoutException):
log.error('Error attempting to disconnect')
return self.UpdateSuccess
最后是回调代码
# Custom MQTT message callback
def ShadowUpdateCallback(self, payload, responseStatus, token):
log = logging.getLogger("InovaIoTDeviceClient:shadowUpdateCallback")
if responseStatus == "timeout":
log.info("Shadow update timeout")
self.UpdateSuccess = False
elif responseStatus == "accepted":
log.info("Shadow update successful")
self.successive_errors = 0
self.UpdateSuccess = True
elif responseStatus == "rejected":
log.info("Rejected shadow update")
self.UpdateSuccess = False
self.ResponseReceived = True
下面是我们得到的各种错误,比较常见。我想说影子更新的成功率只有60%左右。这是 AWS 报告的超时:
2017-09-06 11:15:13: (INFO:AWSIoTPythonSDK.core.shadow.deviceShadow)
Subscribed to update accepted/rejected topics for deviceShadow:
qqpba4fgsfazl2zfgqq8zkavj (Line:372)
2017-09-06 11:15:13: (INFO:AWSIoTPythonSDK.core.protocol.mqttCore) Offline
publish request detected. (Line:343)
2017-09-06 11:15:13: (INFO:AWSIoTPythonSDK.core.protocol.mqttCore) Try
queueing up this request... (Line:347)
2017-09-06 11:15:28: (INFO:AWSIoTPythonSDK.core.shadow.deviceShadow) Shadow
request with token: InCoIoTShadowUpdate_qqpba4fgsfazl2zfgqq8zkavj_0_acbxa
has timed out. (Line:202)
2017-09-06 11:15:28: (INFO:InCoIoTDeviceClient:shadowUpdateCallback) Shadow
update timeout (Line:188)
这是实际影子更新调用的超时错误
2017-09-06 12:40:11: (INFO:AWSIoTPythonSDK.core.protocol.mqttCore) Connected
to AWS IoT. (Line:302)
2017-09-06 12:40:27: (ERROR:AWSIoTPythonSDK.core.protocol.mqttCore) No
feedback detected for subscribe request 1. Timeout and failed. (Line:413)
2017-09-06 12:40:27: (ERROR:InCoIoTDeviceClient:connectAndUpdate) Publish
or subscribe timeout.. (Line:147)
2017-09-06 12:40:27: (INFO:cycle) Unsuccessful shadow update... (Line:173)
所以这些是超时,基本上都是错误。作为一个额外的问题,我们对我们设备的连接和发布尝试进行了 wireshark,不是为了查看内容(因为它是 TLS 1.2 加密的),而是为了查看连接行为。我们注意到,我们从 wireshark 输出中看到的行为是 AWS 端点由至少 8 个不同的 IP 地址提供服务。在超时情况下,我们总是看到连接发生在某处。如果我们订阅并发布到影子更新主题,但收到超时,它只会连接到其中一个地址。在订阅或发布超时时,它会尝试三个不同的地址。
可能所有这些都在 AWS 上进行,性能在他们的末端,但我想知道是否有人看到过这个并且能够解决这个问题。提高超时值似乎没有帮助。
所以我们已经解决了这个问题。 A.) 我们升级到最近发布的 1.2.0 AWS IoT Python SDK,和 B.) 似乎他们修复了一个错误,该错误允许我在收到 MQTT CONNACK 时使用在线回调。所以我将代码更改为:
try:
self.AWSIoTMQTTShadowClient.onOnline = self.onConnect
self.AWSIoTMQTTShadowClient.connect()
except (connectError, connectTimeoutException):
log.error("Error connecting to AWS IoT service")
return False
while not self.is_aws_connected:
self.seconds_waited_for_conn_aws += 1
if self.seconds_waited_for_conn_aws > 30:
log.error("Timeout waiting for connected status.")
return False
time.sleep(1)
# Create a deviceShadow with persistent subscription
updateBot = self.AWSIoTMQTTShadowClient.createShadowHandlerWithName(self.ThingId, True)
try:
updateBot.shadowUpdate(JSONPayload, self.ShadowUpdateCallback, 15)
# This is bad if these errors are thrown, probably either an initial device registration failure
# or global problem with Inova AWS IoT console configuration or lambda function
except (publishError, subscribeError):
log.error('Publish or subscribe error..')
return False
except (publishTimeoutException, subscribeTimeoutException):
# It is possible we are here due to a temporary snafu in AWS
log.error('Publish or subscribe timeout..')
return False
except publishQueueDisabledException:
# From time to time, AWS will randomly disconnect.
log.error('Publish Queue disabled..')
return False
while not self.ResponseReceived:
time.sleep(1)
# reset state
self.ResponseReceived = False
return self.UpdateSuccess
def onConnect(self):
log = logging.getLogger("InovaIoTDeviceClient:onConnect")
log.info("Callback from AWS layer on connect.")
self.is_aws_connected = True
def runOnce(self):
try:
log = logging.getLogger("cycle")
if not self.ConnectAndUpdate():
log.info("Unsuccessful shadow update...")
finally:
# only here to make sure we disconnect
try:
self.AWSIoTMQTTShadowClient.disconnect()
self.is_aws_connected = False
except (disconnectError, disconnectTimeoutException):
log.error('additional error attempting to disconnect')
如您所见,我在第一行代码中设置了 "onOnline" 回调并等待该状态完成。和以前一样,我经常在等待系统在 AWS SDK 基础中被标记为 STABLE(来自 workers.py in AWS SDK)
def _dispatch_connack(self, mid, rc):
status = self._client_status.get_status()
self._logger.debug("Dispatching [connack] event")
if self._need_recover():
if ClientStatus.STABLE != status: # To avoid multiple connack dispatching
self._logger.debug("Has recovery job")
clean_up_debt = Thread(target=self._clean_up_debt)
clean_up_debt.start()
else:
self._logger.debug("No need for recovery")
self._client_status.set_status(ClientStatus.STABLE)
显然,您不能指望 MQTT 系统在 connect() 调用的另一端完全连接并准备就绪,但如果您等待 onOnline 事件,则可以。这就是我们之前必须进行 configureOfflinePublishQueueing() 调用的原因。以及为什么我们会在日志中看到这个...
2017-09-19 14:45:13: (INFO:AWSIoTPythonSDK.core.protocol.mqtt_core) Offline request detected! (Line:313)
谢天谢地,现在效果好多了。顺便说一句,旧代码在更高功率的处理器上更成功(桌面 Linux VM 上的成功率为 98%,小型 ARM 板上的成功率为 45%),所以我认为他们的 "offline publishing" 系统正在经历竞争我们小型 ARM 处理器板上的条件。
一段时间以来,我一直在为我们的设备开发 AWS IoT 设备影子更新。我们在 Linux(由 Yocto/Bitbake 制作)和 Python 2.7.3(其中是我们的硬件供应商为我们用于 ARM 板的 Yocto 提供的唯一可用版本。
我从 AWS 存储库下载了示例影子更新代码,对其进行了修改以满足我们的需求,并将其放入我们的设备构建中。我会说,它在大多数情况下都有效,但它的失败率仍然让我感到不舒服。请注意,在此代码中,我添加了 "OfflinePublishQueueing" 设置来处理设备连接需要一段时间的情况。这是一个网站推荐的,以防止我们遇到 "PublishQueueDisabled" 错误。这是初始化代码。我会补充一点,我知道我们使用的证书是好的,否则永远不会成功。
self.AWSIoTMQTTShadowClient = AWSIoTMQTTShadowClient("InCoIoTShadowUpdate")
self.AWSIoTMQTTShadowClient.configureEndpoint(endpoint, 8883)
self.AWSIoTMQTTShadowClient.configureCredentials(rootCAPath, privateKeyFile,
jointCertificateFile)
# AWSIoTMQTTShadowClient configuration
self.AWSIoTMQTTShadowClient.configureAutoReconnectBackoffTime(1, 32, 20)
self.AWSIoTMQTTShadowClient.configureConnectDisconnectTimeout(10) # 10 sec
self.AWSIoTMQTTShadowClient.configureMQTTOperationTimeout(15) # 15 sec
MQTTClient = self.AWSIoTMQTTShadowClient.getMQTTConnection()
MQTTClient.configureOfflinePublishQueueing(5, DROP_OLDEST)
稍后更新影子的代码,注意 ThingId 是在本地设备配置的其他地方设置的。
def ConnectAndUpdate(self):
deviceState = InovaIoTDeviceState()
log = logging.getLogger("InovaIoTDeviceClient:connectAndUpdate")
# Connect and subscribe to AWS IoT
try:
self.AWSIoTMQTTShadowClient.connect()
except (connectError, connectTimeoutException):
log.error("Error connecting to AWS IoT service")
return False
# Create a deviceShadow with persistent subscription
updateBot = self.AWSIoTMQTTShadowClient.createShadowHandlerWithName(self.ThingId, True)
JSONPayload = deviceState.GetDeviceShadowDocument()
try:
updateBot.shadowUpdate(JSONPayload, self.ShadowUpdateCallback, 15)
# This is bad if these errors are thrown, probably either an initial device registration failure
# or global problem with Inova AWS IoT console configuration or lambda function
except (publishError, subscribeError):
log.error('Publish or subscribe error..')
return False
except (publishTimeoutException, subscribeTimeoutException):
# It is possible we are here due to a temporary snafu in AWS
log.error('Publish or subscribe timeout..')
return False
except publishQueueDisabledException:
# From time to time, AWS will randomly disconnect.
log.error('Publish Queue disabled..')
return False
while not self.ResponseReceived:
time.sleep(1)
# reset state
self.ResponseReceived = False
try:
self.AWSIoTMQTTShadowClient.disconnect()
except (disconnectError, disconnectTimeoutException):
log.error('Error attempting to disconnect')
return self.UpdateSuccess
最后是回调代码
# Custom MQTT message callback
def ShadowUpdateCallback(self, payload, responseStatus, token):
log = logging.getLogger("InovaIoTDeviceClient:shadowUpdateCallback")
if responseStatus == "timeout":
log.info("Shadow update timeout")
self.UpdateSuccess = False
elif responseStatus == "accepted":
log.info("Shadow update successful")
self.successive_errors = 0
self.UpdateSuccess = True
elif responseStatus == "rejected":
log.info("Rejected shadow update")
self.UpdateSuccess = False
self.ResponseReceived = True
下面是我们得到的各种错误,比较常见。我想说影子更新的成功率只有60%左右。这是 AWS 报告的超时:
2017-09-06 11:15:13: (INFO:AWSIoTPythonSDK.core.shadow.deviceShadow)
Subscribed to update accepted/rejected topics for deviceShadow:
qqpba4fgsfazl2zfgqq8zkavj (Line:372)
2017-09-06 11:15:13: (INFO:AWSIoTPythonSDK.core.protocol.mqttCore) Offline
publish request detected. (Line:343)
2017-09-06 11:15:13: (INFO:AWSIoTPythonSDK.core.protocol.mqttCore) Try
queueing up this request... (Line:347)
2017-09-06 11:15:28: (INFO:AWSIoTPythonSDK.core.shadow.deviceShadow) Shadow
request with token: InCoIoTShadowUpdate_qqpba4fgsfazl2zfgqq8zkavj_0_acbxa
has timed out. (Line:202)
2017-09-06 11:15:28: (INFO:InCoIoTDeviceClient:shadowUpdateCallback) Shadow
update timeout (Line:188)
这是实际影子更新调用的超时错误
2017-09-06 12:40:11: (INFO:AWSIoTPythonSDK.core.protocol.mqttCore) Connected
to AWS IoT. (Line:302)
2017-09-06 12:40:27: (ERROR:AWSIoTPythonSDK.core.protocol.mqttCore) No
feedback detected for subscribe request 1. Timeout and failed. (Line:413)
2017-09-06 12:40:27: (ERROR:InCoIoTDeviceClient:connectAndUpdate) Publish
or subscribe timeout.. (Line:147)
2017-09-06 12:40:27: (INFO:cycle) Unsuccessful shadow update... (Line:173)
所以这些是超时,基本上都是错误。作为一个额外的问题,我们对我们设备的连接和发布尝试进行了 wireshark,不是为了查看内容(因为它是 TLS 1.2 加密的),而是为了查看连接行为。我们注意到,我们从 wireshark 输出中看到的行为是 AWS 端点由至少 8 个不同的 IP 地址提供服务。在超时情况下,我们总是看到连接发生在某处。如果我们订阅并发布到影子更新主题,但收到超时,它只会连接到其中一个地址。在订阅或发布超时时,它会尝试三个不同的地址。
可能所有这些都在 AWS 上进行,性能在他们的末端,但我想知道是否有人看到过这个并且能够解决这个问题。提高超时值似乎没有帮助。
所以我们已经解决了这个问题。 A.) 我们升级到最近发布的 1.2.0 AWS IoT Python SDK,和 B.) 似乎他们修复了一个错误,该错误允许我在收到 MQTT CONNACK 时使用在线回调。所以我将代码更改为:
try:
self.AWSIoTMQTTShadowClient.onOnline = self.onConnect
self.AWSIoTMQTTShadowClient.connect()
except (connectError, connectTimeoutException):
log.error("Error connecting to AWS IoT service")
return False
while not self.is_aws_connected:
self.seconds_waited_for_conn_aws += 1
if self.seconds_waited_for_conn_aws > 30:
log.error("Timeout waiting for connected status.")
return False
time.sleep(1)
# Create a deviceShadow with persistent subscription
updateBot = self.AWSIoTMQTTShadowClient.createShadowHandlerWithName(self.ThingId, True)
try:
updateBot.shadowUpdate(JSONPayload, self.ShadowUpdateCallback, 15)
# This is bad if these errors are thrown, probably either an initial device registration failure
# or global problem with Inova AWS IoT console configuration or lambda function
except (publishError, subscribeError):
log.error('Publish or subscribe error..')
return False
except (publishTimeoutException, subscribeTimeoutException):
# It is possible we are here due to a temporary snafu in AWS
log.error('Publish or subscribe timeout..')
return False
except publishQueueDisabledException:
# From time to time, AWS will randomly disconnect.
log.error('Publish Queue disabled..')
return False
while not self.ResponseReceived:
time.sleep(1)
# reset state
self.ResponseReceived = False
return self.UpdateSuccess
def onConnect(self):
log = logging.getLogger("InovaIoTDeviceClient:onConnect")
log.info("Callback from AWS layer on connect.")
self.is_aws_connected = True
def runOnce(self):
try:
log = logging.getLogger("cycle")
if not self.ConnectAndUpdate():
log.info("Unsuccessful shadow update...")
finally:
# only here to make sure we disconnect
try:
self.AWSIoTMQTTShadowClient.disconnect()
self.is_aws_connected = False
except (disconnectError, disconnectTimeoutException):
log.error('additional error attempting to disconnect')
如您所见,我在第一行代码中设置了 "onOnline" 回调并等待该状态完成。和以前一样,我经常在等待系统在 AWS SDK 基础中被标记为 STABLE(来自 workers.py in AWS SDK)
def _dispatch_connack(self, mid, rc):
status = self._client_status.get_status()
self._logger.debug("Dispatching [connack] event")
if self._need_recover():
if ClientStatus.STABLE != status: # To avoid multiple connack dispatching
self._logger.debug("Has recovery job")
clean_up_debt = Thread(target=self._clean_up_debt)
clean_up_debt.start()
else:
self._logger.debug("No need for recovery")
self._client_status.set_status(ClientStatus.STABLE)
显然,您不能指望 MQTT 系统在 connect() 调用的另一端完全连接并准备就绪,但如果您等待 onOnline 事件,则可以。这就是我们之前必须进行 configureOfflinePublishQueueing() 调用的原因。以及为什么我们会在日志中看到这个...
2017-09-19 14:45:13: (INFO:AWSIoTPythonSDK.core.protocol.mqtt_core) Offline request detected! (Line:313)
谢天谢地,现在效果好多了。顺便说一句,旧代码在更高功率的处理器上更成功(桌面 Linux VM 上的成功率为 98%,小型 ARM 板上的成功率为 45%),所以我认为他们的 "offline publishing" 系统正在经历竞争我们小型 ARM 处理器板上的条件。