get_multiple_points volttron RPC 调用

get_multiple_points volttron RPC call

我是否有机会获得有关构建代理的正确方法的提示,该代理可以从 BACnet 系统上的多个设备读取多个点?我正在查看 actuator agent 代码,试图了解如何进行正确的 rpc 调用。

因此使用代理创建向导完成 agent development procedure

init 中,我目前只是硬编码:

def __init__(self, **kwargs):
    super(Setteroccvav, self).__init__(**kwargs)
    _log.debug("vip_identity: " + self.core.identity)

    self.default_config = {}


    self.agent_id = "dr_event_setpoint_adj_agent"

    self.topic = "slipstream_internal/slipstream_hq/"
    self.jci_zonetemp_string = "/ZN-T"

所以建筑物中的 BACnet 系统有 JCI VAV 箱,所有箱子都带有相同的区域温度传感器点 self.jci_zonetemp_stringself.topic 是我通过 BACnet 发现将它们拉入 volttron/config 商店的方式进程。

在我的驱动点函数(从 CSV 驱动程序示例中复制)中,我是否完全了解如何使用 get_multiple_points 进行名为 reads 的 rpc 调用?希望抓取 BACnet 设备 ID 6、7、8、9、10 上的区域温度传感器读数,它们都是具有相同 points/BAS 程序 运行 的相同 VAV 箱控制器 运行。

def actuate_point(self):
    """
    Request that the Actuator set a point on the CSV device
    """

    # Create a start and end timestep to serve as the times we reserve to communicate with the CSV Device
    _now = get_aware_utc_now()
    str_now = format_timestamp(_now)
    _end = _now + td(seconds=10)
    str_end = format_timestamp(_end)

    # Wrap the timestamps and device topic (used by the Actuator to identify the device) into an actuator request
    schedule_request = [[self.ahu_topic, str_now, str_end]]
    
    # Use a remote procedure call to ask the actuator to schedule us some time on the device
    result = self.vip.rpc.call(
        'platform.actuator', 'request_new_schedule', self.agent_id, 'my_test', 'HIGH', schedule_request).get(
        timeout=4)
    _log.info(f'*** [INFO] *** - SCHEDULED TIME ON ACTUATOR From "actuate_point" method sucess')

 
    reads = publish_agent.vip.rpc.call(
                   'platform.actuator',
                   'get_multiple_points',
                   self.agent_id,
                   [(('self.topic'+'6', self.jci_zonetemp_string)),
                   (('self.topic'+'7', self.jci_zonetemp_string)),
                   (('self.topic'+'8', self.jci_zonetemp_string)),
                   (('self.topic'+'9', self.jci_zonetemp_string)),
                   (('self.topic'+'10', self.jci_zonetemp_string))]).get(timeout=10)

非常感谢在我破坏实时系统之前的任何提示:)

也许这只是我对你的问题的解释,但它似乎有点开放性 - 所以我将以类似的方式回答 - 一般(我会尽量保持简短)。

首先,您需要依次定位每个设备的信息列表;即它可能只包含一个 IP(v4) 地址(用于物理设备)和(逻辑)设备的 BOIN(BACnet 对象实例编号)——或者如果请求是 routed/forwarded 在 by/via 上BACnet router/BACnet 网关然后也可能是 DNET # 和 DADR。

然后你可能想要 - 一次为每个 device/one 检索设备对象列表 属性 的 first/0-element 值 - 以获得对象的数量它包含,让您知道有多少对象可用(包括逻辑 device/device-type 对象)——您需要从 it/iterate 中检索;注意:在现实世界中,尽管设备类型对象成为列表中的第一个很常见,但不能保证总是如此。

尽管 BACnet 标准开始允许从每个对象中检索 属性-List (属性),但大多数设备目前还不支持它,所以您可能需要自己了解每种不同对象类型支持哪些属性(/至少是您感兴趣的属性);可能至少知道哪些 ones/object-types 支持现值 属性 哪些不支持。

一个理想的做法是拥有以下模拟方面 - 作为测试目的的假货,而不是针对 live/important 设备进行测试( - 或者至少考虑针对启用了 noddy BACnet 的设备进行测试 Raspberry PI 或基于硬件的 like):

  • 模拟您的 BACnet 服务
  • BACnet 通信堆栈的模拟
  • 整个设备的模拟(- 如果您不能自己编写一个,那么甚至可以从 YABE 'Room Control Simulator' 作为起点)

希望这(在某种程度上)有所帮助。

执行器RPC调用的基本形式如下:

# use the agent's VIP connection to make an RPC call to the actuator agent 
result = self.vip.rpc.call('platform.actuator', <RPC exported function>, <args>).get(timeout=<seconds>)

因为我们正在处理设备,所以我们需要知道我们对哪些设备感兴趣,以及它们的主题是什么。我们还需要知道我们感兴趣的设备上的哪些点。

device_map = {
  'device1': '201201',
  'device2': '201202',
  'device3': '201203',
  'device4': '201204', 
}

building_topic = 'campus/building'

all_device_points = ['point1', 'point2', 'point3']

使用执行器获取积分需要积分主题列表或 device/point 个主题对。

# we only need one of the following:
point topics = []
for device in device_map.values():
    for point in all_device_points:
        point_topics.append('/'.join([building_topic, device, point]))

device_point_pairs = []
for device in device_map.values():
    for point in all_device_points:
        device_point_pairs.append(('/'.join([building_topic, device]),point,))

现在我们将 RPC 请求发送到执行器:

# can use instead device_point_pairs
point_results = self.vip.rpc.call('platform.actuator', 'get_multiple_points', point_topics).get(timeout=3)