使用 aws-iot-device-sdk-python-v2 发布到设备影子的问题

Issues publishing to device shadow using the aws-iot-device-sdk-python-v2

在使用 aws iot device sdk for python v2 (v1.7.1) 的 python 应用程序中,我 运行 遇到无法更新设备影子的问题。

启动程序后,DeviceShadowManager 将尝试获取最新的影子状态并在本地进行设置。 如果存在 delta 状态,DeviceShadowManager 将合并最后的 reported 状态和 delta 状态并发布。 这样可行。但是,当经理订阅更新时,在初始设置后,我 运行 出错了, 当 desired 状态发生变化时,管理器无法更新 reported 状态。这是错误:

Exception ignored in: <class 'TypeError'>
Traceback (most recent call last):
  File "/Users/tom/.../lib/python3.9/site-packages/awscrt/mqtt.py", line 506, in callback_wrapper
    callback(topic=topic, payload=payload)
TypeError: callback_wrapper() missing 3 required positional arguments: 'dup', 'qos', and 'retain'

我看了the source,但就是不明白为什么会引发TypeError, 特别是因为这个确切的场景似乎是由 tryexcept 块处理的,还是我弄错了?

错误来源:

 if callback:
    def callback_wrapper(topic, payload, dup, qos, retain):
        try:
            callback(topic=topic, payload=payload, dup=dup, qos=QoS(qos), retain=retain)
        except TypeError:
            # This callback used to have fewer args.
            # Try again, passing only those those args, to cover case where
            # user function failed to take forward-compatibility **kwargs.
            callback(topic=topic, payload=payload) # this is line 506

下面你可以找到我的代码和程序的日志。

这个数据类表示阴影:

from dataclasses import dataclass

@dataclass
class DeviceShadow:
    score_threshold: float = 0.6
    minimum_distance: int = 150

阴影由 DeviceShadowManager 管理。其中大部分是基于 shadow sample from the aforementioned repository.

from dataclasses import asdict
from queue import Queue
from threading import Lock

from awscrt import mqtt
from awsiot import iotshadow
from awsiot.iotshadow import IotShadowClient

from app.device_shadow.device_shadow import DeviceShadow, from_json as device_shadow_from_json
from app.models import log

SHADOW_VALUE_DEFAULT = DeviceShadow()


class DeviceShadowManager:
    _shadow_client: IotShadowClient
    shadow_value: DeviceShadow = DeviceShadow()

    _lock = Lock()
    _thing_name: str

    def __init__(self, thing_name: str, mqtt_connection: mqtt.Connection):
        self._thing_name = thing_name
        self._shadow_client = iotshadow.IotShadowClient(mqtt_connection)

        update_accepted_subscribed_future, _ = self._shadow_client.subscribe_to_update_shadow_accepted(
            request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=self._thing_name),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.on_update_shadow_accepted  # omitted
        )

        update_rejected_subscribed_future, _ = self._shadow_client.subscribe_to_update_shadow_rejected(
            request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=self._thing_name),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.on_update_shadow_rejected  # omitted
        )

        # Wait for subscriptions to succeed
        update_accepted_subscribed_future.result(60)
        update_rejected_subscribed_future.result(60)

        log.info("Subscribing to Get responses...")
        get_accepted_subscribed_future, _ = self._shadow_client.subscribe_to_get_shadow_accepted(
            request=iotshadow.GetShadowSubscriptionRequest(thing_name=self._thing_name),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.on_get_shadow_accepted)

        get_rejected_subscribed_future, _ = self._shadow_client.subscribe_to_get_shadow_rejected(
            request=iotshadow.GetShadowSubscriptionRequest(thing_name=self._thing_name),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.on_get_shadow_rejected)  # omitted

        # Wait for subscriptions to succeed
        get_accepted_subscribed_future.result()
        get_rejected_subscribed_future.result()

        log.info("Subscribing to Delta events...")
        delta_subscribed_future, _ = self._shadow_client.subscribe_to_shadow_delta_updated_events(
            request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(
                thing_name=self._thing_name
            ),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.on_shadow_delta_updated)

        # Wait for subscription to succeed
        delta_subscribed_future.result()

        # From here on out the rest runs asynchronously.

        # Issue request for shadow's current value.
        # The response will be received by the on_get_accepted() callback
        with self._lock:
            publish_get_future = self._shadow_client.publish_get_shadow(
                request=iotshadow.GetShadowRequest(
                    thing_name=self._thing_name,
                ),
                qos=mqtt.QoS.AT_LEAST_ONCE
            )

        # Ensure that publish succeeds
        publish_get_future.result()

    def on_get_shadow_accepted(self, response: iotshadow.GetShadowResponse) -> None:
        log.info("Finished getting initial shadow value.")

        if response.state and response.state.delta:
            if not response.state.reported:
                response.state.reported = {}
            merged_state = self.merge_states(response.state.delta, response.state.desired)
            return self.set_desired(device_shadow_from_json(merged_state))

        if response.state and response.state.reported:
            return self.set_local(device_shadow_from_json(response.state.reported))

        self.set_desired(SHADOW_VALUE_DEFAULT)
        return

    def on_shadow_delta_updated(self, delta: iotshadow.ShadowDeltaUpdatedEvent) -> None:
        if delta.state:
            if delta.state is None:
                log.info("Delta reports that nothing is set. Setting defaults...")
                self.set_desired(SHADOW_VALUE_DEFAULT)
                return

            log.info("Delta reports that desired shadow is '{}'. Changing local shadow...".format(delta.state))
            self.set_desired(self.merge_states(delta.state, self.shadow_value))

        else:
            log.info("Delta did not report a change")

    @staticmethod
    def merge_states(delta: dict, reported: DeviceShadow):
        for key, value in delta.items():
            reported[key] = value

        return reported

    def set_local(self, value: DeviceShadow) -> None:
        with self._lock:
            self.shadow_value = value

    def set_desired(self, new_value: DeviceShadow) -> None:
        with self._lock:
            if self.shadow_value == new_value:
                log.debug("Local shadow is already '{}'.".format(new_value))
                return

            log.debug("Changing local shadow to '{}'.".format(new_value))
            self.shadow_value = new_value

            log.debug("Updating reported shadow  to '{}'...".format(new_value))
            request = iotshadow.UpdateShadowRequest(
                thing_name=self._thing_name,
                state=iotshadow.ShadowState(
                    desired=asdict(new_value),
                    reported=asdict(new_value),
                ),
            )

            self._shadow_client.publish_update_shadow(request, mqtt.QoS.AT_LEAST_ONCE)

您将在下面找到日志:

DEBUG:app.mqtt:Connecting to xxxxxxxxxxxxxx-ats.iot.eu-central-1.amazonaws.com with client ID '80d8bc54-971e-0e65-a537-37d14a3cb630'...
INFO:app.models:Subscribing to Get responses...
INFO:app.models:Subscribing to Delta events...
INFO:app.models:Finished getting initial shadow value.
DEBUG:app.models:Changed local shadow to 'DeviceShadow(score_threshold=0.7, minimum_distance=1503)'.
DEBUG:app.models:Updating reported shadow  to 'DeviceShadow(score_threshold=0.7, minimum_distance=1503)'...
INFO:app.models:Update request published.
DEBUG:app.models:Finished updating reported shadow to '{'score_threshold': 0.7, 'minimum_distance': 1503}'.
INFO:app.models:Delta reports that desired shadow is '{'minimum_distance': 15035}'. Changing local shadow...
Exception ignored in: <class 'TypeError'>
Traceback (most recent call last):
  File "/Users/tom/.../lib/python3.9/site-packages/awscrt/mqtt.py", line 506, in callback_wrapper
    callback(topic=topic, payload=payload)
TypeError: callback_wrapper() missing 3 required positional arguments: 'dup', 'qos', and 'retain'
DEBUG:app.models:Finished updating reported shadow to '{'score_threshold': 0.7, 'minimum_distance': 1503}'.

如您所见,堆栈跟踪非常短,有没有更好的调试方法? 关于为什么它会给我这个特定错误以及如何解决它的任何想法? 感谢所有帮助!

我很确定问题出在

@staticmethod
def merge_states(delta: dict, reported: DeviceShadow):
   for key, value in delta.items():
      reported[key] = value
   return reported

reported 参数的 __setitem__ 调用会引发 TypeError,因为报告的参数是 DeviceShadow 不支持项目分配的数据类对象。

如果你想在你有字段名称字符串的地方设置数据类的字段,你可以使用setattr(reported, key, value)