get_multiple_points volttron RPC 调用
get_multiple_points volttron RPC call
我是否有机会获得有关构建代理的正确方法的提示,该代理可以从 BACnet 系统上的多个设备读取多个点?我正在查看 actuator agent 代码,试图了解如何进行正确的 rpc 调用。
因此使用代理创建向导完成 agent development procedure。
在 init
中,我目前只是硬编码:
def __init__(self, **kwargs):
super(Setteroccvav, self).__init__(**kwargs)
_log.debug("vip_identity: " + self.core.identity)
self.default_config = {}
self.agent_id = "dr_event_setpoint_adj_agent"
self.topic = "slipstream_internal/slipstream_hq/"
self.jci_zonetemp_string = "/ZN-T"
所以建筑物中的 BACnet 系统有 JCI VAV 箱,所有箱子都带有相同的区域温度传感器点 self.jci_zonetemp_string
和 self.topic
是我通过 BACnet 发现将它们拉入 volttron/config 商店的方式进程。
在我的驱动点函数(从 CSV 驱动程序示例中复制)中,我是否完全了解如何使用 get_multiple_points
进行名为 reads
的 rpc 调用?希望抓取 BACnet 设备 ID 6、7、8、9、10 上的区域温度传感器读数,它们都是具有相同 points/BAS 程序 运行 的相同 VAV 箱控制器 运行。
def actuate_point(self):
"""
Request that the Actuator set a point on the CSV device
"""
# Create a start and end timestep to serve as the times we reserve to communicate with the CSV Device
_now = get_aware_utc_now()
str_now = format_timestamp(_now)
_end = _now + td(seconds=10)
str_end = format_timestamp(_end)
# Wrap the timestamps and device topic (used by the Actuator to identify the device) into an actuator request
schedule_request = [[self.ahu_topic, str_now, str_end]]
# Use a remote procedure call to ask the actuator to schedule us some time on the device
result = self.vip.rpc.call(
'platform.actuator', 'request_new_schedule', self.agent_id, 'my_test', 'HIGH', schedule_request).get(
timeout=4)
_log.info(f'*** [INFO] *** - SCHEDULED TIME ON ACTUATOR From "actuate_point" method sucess')
reads = publish_agent.vip.rpc.call(
'platform.actuator',
'get_multiple_points',
self.agent_id,
[(('self.topic'+'6', self.jci_zonetemp_string)),
(('self.topic'+'7', self.jci_zonetemp_string)),
(('self.topic'+'8', self.jci_zonetemp_string)),
(('self.topic'+'9', self.jci_zonetemp_string)),
(('self.topic'+'10', self.jci_zonetemp_string))]).get(timeout=10)
非常感谢在我破坏实时系统之前的任何提示:)
也许这只是我对你的问题的解释,但它似乎有点开放性 - 所以我将以类似的方式回答 - 一般(我会尽量保持简短)。
首先,您需要依次定位每个设备的信息列表;即它可能只包含一个 IP(v4) 地址(用于物理设备)和(逻辑)设备的 BOIN(BACnet 对象实例编号)——或者如果请求是 routed/forwarded 在 by/via 上BACnet router/BACnet 网关然后也可能是 DNET # 和 DADR。
然后你可能想要 - 一次为每个 device/one 检索设备对象列表 属性 的 first/0-element 值 - 以获得对象的数量它包含,让您知道有多少对象可用(包括逻辑 device/device-type 对象)——您需要从 it/iterate 中检索;注意:在现实世界中,尽管设备类型对象成为列表中的第一个很常见,但不能保证总是如此。
尽管 BACnet 标准开始允许从每个对象中检索 属性-List (属性),但大多数设备目前还不支持它,所以您可能需要自己了解每种不同对象类型支持哪些属性(/至少是您感兴趣的属性);可能至少知道哪些 ones/object-types 支持现值 属性 哪些不支持。
一个理想的做法是拥有以下模拟方面 - 作为测试目的的假货,而不是针对 live/important 设备进行测试( - 或者至少考虑针对启用了 noddy BACnet 的设备进行测试 Raspberry PI 或基于硬件的 like):
- 模拟您的 BACnet 服务
- BACnet 通信堆栈的模拟
- 整个设备的模拟(- 如果您不能自己编写一个,那么甚至可以从 YABE 'Room Control Simulator' 作为起点)
希望这(在某种程度上)有所帮助。
执行器RPC调用的基本形式如下:
# use the agent's VIP connection to make an RPC call to the actuator agent
result = self.vip.rpc.call('platform.actuator', <RPC exported function>, <args>).get(timeout=<seconds>)
因为我们正在处理设备,所以我们需要知道我们对哪些设备感兴趣,以及它们的主题是什么。我们还需要知道我们感兴趣的设备上的哪些点。
device_map = {
'device1': '201201',
'device2': '201202',
'device3': '201203',
'device4': '201204',
}
building_topic = 'campus/building'
all_device_points = ['point1', 'point2', 'point3']
使用执行器获取积分需要积分主题列表或 device/point 个主题对。
# we only need one of the following:
point topics = []
for device in device_map.values():
for point in all_device_points:
point_topics.append('/'.join([building_topic, device, point]))
device_point_pairs = []
for device in device_map.values():
for point in all_device_points:
device_point_pairs.append(('/'.join([building_topic, device]),point,))
现在我们将 RPC 请求发送到执行器:
# can use instead device_point_pairs
point_results = self.vip.rpc.call('platform.actuator', 'get_multiple_points', point_topics).get(timeout=3)
我是否有机会获得有关构建代理的正确方法的提示,该代理可以从 BACnet 系统上的多个设备读取多个点?我正在查看 actuator agent 代码,试图了解如何进行正确的 rpc 调用。
因此使用代理创建向导完成 agent development procedure。
在 init
中,我目前只是硬编码:
def __init__(self, **kwargs):
super(Setteroccvav, self).__init__(**kwargs)
_log.debug("vip_identity: " + self.core.identity)
self.default_config = {}
self.agent_id = "dr_event_setpoint_adj_agent"
self.topic = "slipstream_internal/slipstream_hq/"
self.jci_zonetemp_string = "/ZN-T"
所以建筑物中的 BACnet 系统有 JCI VAV 箱,所有箱子都带有相同的区域温度传感器点 self.jci_zonetemp_string
和 self.topic
是我通过 BACnet 发现将它们拉入 volttron/config 商店的方式进程。
在我的驱动点函数(从 CSV 驱动程序示例中复制)中,我是否完全了解如何使用 get_multiple_points
进行名为 reads
的 rpc 调用?希望抓取 BACnet 设备 ID 6、7、8、9、10 上的区域温度传感器读数,它们都是具有相同 points/BAS 程序 运行 的相同 VAV 箱控制器 运行。
def actuate_point(self):
"""
Request that the Actuator set a point on the CSV device
"""
# Create a start and end timestep to serve as the times we reserve to communicate with the CSV Device
_now = get_aware_utc_now()
str_now = format_timestamp(_now)
_end = _now + td(seconds=10)
str_end = format_timestamp(_end)
# Wrap the timestamps and device topic (used by the Actuator to identify the device) into an actuator request
schedule_request = [[self.ahu_topic, str_now, str_end]]
# Use a remote procedure call to ask the actuator to schedule us some time on the device
result = self.vip.rpc.call(
'platform.actuator', 'request_new_schedule', self.agent_id, 'my_test', 'HIGH', schedule_request).get(
timeout=4)
_log.info(f'*** [INFO] *** - SCHEDULED TIME ON ACTUATOR From "actuate_point" method sucess')
reads = publish_agent.vip.rpc.call(
'platform.actuator',
'get_multiple_points',
self.agent_id,
[(('self.topic'+'6', self.jci_zonetemp_string)),
(('self.topic'+'7', self.jci_zonetemp_string)),
(('self.topic'+'8', self.jci_zonetemp_string)),
(('self.topic'+'9', self.jci_zonetemp_string)),
(('self.topic'+'10', self.jci_zonetemp_string))]).get(timeout=10)
非常感谢在我破坏实时系统之前的任何提示:)
也许这只是我对你的问题的解释,但它似乎有点开放性 - 所以我将以类似的方式回答 - 一般(我会尽量保持简短)。
首先,您需要依次定位每个设备的信息列表;即它可能只包含一个 IP(v4) 地址(用于物理设备)和(逻辑)设备的 BOIN(BACnet 对象实例编号)——或者如果请求是 routed/forwarded 在 by/via 上BACnet router/BACnet 网关然后也可能是 DNET # 和 DADR。
然后你可能想要 - 一次为每个 device/one 检索设备对象列表 属性 的 first/0-element 值 - 以获得对象的数量它包含,让您知道有多少对象可用(包括逻辑 device/device-type 对象)——您需要从 it/iterate 中检索;注意:在现实世界中,尽管设备类型对象成为列表中的第一个很常见,但不能保证总是如此。
尽管 BACnet 标准开始允许从每个对象中检索 属性-List (属性),但大多数设备目前还不支持它,所以您可能需要自己了解每种不同对象类型支持哪些属性(/至少是您感兴趣的属性);可能至少知道哪些 ones/object-types 支持现值 属性 哪些不支持。
一个理想的做法是拥有以下模拟方面 - 作为测试目的的假货,而不是针对 live/important 设备进行测试( - 或者至少考虑针对启用了 noddy BACnet 的设备进行测试 Raspberry PI 或基于硬件的 like):
- 模拟您的 BACnet 服务
- BACnet 通信堆栈的模拟
- 整个设备的模拟(- 如果您不能自己编写一个,那么甚至可以从 YABE 'Room Control Simulator' 作为起点)
希望这(在某种程度上)有所帮助。
执行器RPC调用的基本形式如下:
# use the agent's VIP connection to make an RPC call to the actuator agent
result = self.vip.rpc.call('platform.actuator', <RPC exported function>, <args>).get(timeout=<seconds>)
因为我们正在处理设备,所以我们需要知道我们对哪些设备感兴趣,以及它们的主题是什么。我们还需要知道我们感兴趣的设备上的哪些点。
device_map = {
'device1': '201201',
'device2': '201202',
'device3': '201203',
'device4': '201204',
}
building_topic = 'campus/building'
all_device_points = ['point1', 'point2', 'point3']
使用执行器获取积分需要积分主题列表或 device/point 个主题对。
# we only need one of the following:
point topics = []
for device in device_map.values():
for point in all_device_points:
point_topics.append('/'.join([building_topic, device, point]))
device_point_pairs = []
for device in device_map.values():
for point in all_device_points:
device_point_pairs.append(('/'.join([building_topic, device]),point,))
现在我们将 RPC 请求发送到执行器:
# can use instead device_point_pairs
point_results = self.vip.rpc.call('platform.actuator', 'get_multiple_points', point_topics).get(timeout=3)