如何编写一种方法来检查代理是否已完成 Osbrain 中的任务?
How to write a method that checks if an agent is done with a task in Osbrain?
我有一个问题,关于如何编写一个正确的函数来检查代理是否在 Osbrain 中完成了他们的任务。我有三个代理,传输代理、节点代理和协调器代理。协调代理的主要任务是同步其他代理的动作。协调器代理绑定到 SYNC_PUB,节点和传输代理绑定到协调器代理。我的初始实现在第一个 timestep/iteration 之后挂起。我是否错误地实施了 status_checker 方法?
from osbrain import run_nameserver, run_agent, Agent
import time
SYNCHRONIZER_CHANNEL_1 = 'coordinator1'
class TransportAgent(Agent):
def transportAgent_first_handler(self, message):
# time.sleep(2)
self.log_info(message)
self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')
def process_reply(self, message):
yield 1
class NodeAgent(Agent):
def NodeAgent_first_handler(self, message):
time.sleep(2)
self.log_info(message)
self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')
def process_reply(self, message):
yield 1
class SynchronizerCoordinatorAgent(Agent):
def on_init(self):
self.network_agent_addr = self.bind('SYNC_PUB', alias=SYNCHRONIZER_CHANNEL_1, handler='status_handler')
self.status_list = []
def first_synchronization(self, time_step, iteration):
self.send(SYNCHRONIZER_CHANNEL_1, message={'time_step': time_step, 'iteration': iteration},
topic='first_synchronization')
def status_handler(self, message):
yield 'I have added you to the status_list'
self.status_list.append(message)
def status_checker(self):
count = 0
while len(self.status_list) < 2:
count += 1
time.sleep(1)
return
self.status_list.clear()
def init_environment(self):
self.TransportAgent = run_agent('TransportAgent', base=TransportAgent)
self.NodeAgent = run_agent('NodeAgent', base=NodeAgent)
self.TransportAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
handler={'first_synchronization': TransportAgent.transportAgent_first_handler})
self.NodeAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
handler={'first_synchronization': NodeAgent.NodeAgent_first_handler})
if __name__ == '__main__':
ns = run_nameserver()
synchronizer_coordinator_agent = run_agent('Synchronizer_CoordinatorAgent',
base=SynchronizerCoordinatorAgent)
synchronizer_coordinator_agent.init_environment()
for iteration in range(1, 2):
for time_step in range(0, 90, 30):
synchronizer_coordinator_agent.first_synchronization(time_step=time_step, iteration=iteration)
synchronizer_coordinator_agent.status_checker()
time.sleep(1)
它打印这个然后挂起
(网络代理): {'time_step': 0, 'iteration': 1}
(RMOAgent): {'time_step': 0, 'iteration': 1}
是啊,看来你的 status_checker()
方法有问题。我猜您希望该方法阻塞,直到 status_list
中有 2 条消息(一条来自节点代理,另一条来自传输代理)。
所以您可能正在寻找类似的东西:
def status_checker(self):
while True:
if len(self.status_list) == 2:
break
time.sleep(1)
self.status_list.clear()
但是,当您从代理调用该方法时:
synchronizer_coordinator_agent.status_checker()
协调器正在阻止执行该调用,因此它不会处理其他传入消息。一种快速且 肮脏的 解决方法是使用这样的 unsafe
call:
synchronizer_coordinator_agent.unsafe.status_checker()
我在这里看到的主要问题是您处理来自 __main__
的状态检查器的方式。您应该将您的 synchronization/steps 移到 您的协调器中。这意味着:
- 从
__main__
调用协调器上的 .start_iterations()
方法,以便协调器执行第一步
- 然后让你的协调器对收到的消息有反应(一旦它收到 2 条消息,它就会执行下一步)
- 协调器一直执行步骤直到完成
- 从 main 开始,只需定期监控您的协调代理,看看它何时完成
- 使用
ns.shutdown()
关闭您的系统
您的主要内容可能如下所示:
if __name__ == "__main__":
ns = run_nameserver()
coordinator = run_agent(...)
coordinator.init_environment()
coordinator.start_iterations()
while not coordinator.finished():
time.sleep(0.5)
ns.shutdown()
与此并无太大关系,但请注意,您当前的 range(1, 2)
只会导致一次迭代(尽管这可能是故意的)。如果你想要 2 次迭代,你可以使用 range(2)
.
我有一个问题,关于如何编写一个正确的函数来检查代理是否在 Osbrain 中完成了他们的任务。我有三个代理,传输代理、节点代理和协调器代理。协调代理的主要任务是同步其他代理的动作。协调器代理绑定到 SYNC_PUB,节点和传输代理绑定到协调器代理。我的初始实现在第一个 timestep/iteration 之后挂起。我是否错误地实施了 status_checker 方法?
from osbrain import run_nameserver, run_agent, Agent
import time
SYNCHRONIZER_CHANNEL_1 = 'coordinator1'
class TransportAgent(Agent):
def transportAgent_first_handler(self, message):
# time.sleep(2)
self.log_info(message)
self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')
def process_reply(self, message):
yield 1
class NodeAgent(Agent):
def NodeAgent_first_handler(self, message):
time.sleep(2)
self.log_info(message)
self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')
def process_reply(self, message):
yield 1
class SynchronizerCoordinatorAgent(Agent):
def on_init(self):
self.network_agent_addr = self.bind('SYNC_PUB', alias=SYNCHRONIZER_CHANNEL_1, handler='status_handler')
self.status_list = []
def first_synchronization(self, time_step, iteration):
self.send(SYNCHRONIZER_CHANNEL_1, message={'time_step': time_step, 'iteration': iteration},
topic='first_synchronization')
def status_handler(self, message):
yield 'I have added you to the status_list'
self.status_list.append(message)
def status_checker(self):
count = 0
while len(self.status_list) < 2:
count += 1
time.sleep(1)
return
self.status_list.clear()
def init_environment(self):
self.TransportAgent = run_agent('TransportAgent', base=TransportAgent)
self.NodeAgent = run_agent('NodeAgent', base=NodeAgent)
self.TransportAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
handler={'first_synchronization': TransportAgent.transportAgent_first_handler})
self.NodeAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
handler={'first_synchronization': NodeAgent.NodeAgent_first_handler})
if __name__ == '__main__':
ns = run_nameserver()
synchronizer_coordinator_agent = run_agent('Synchronizer_CoordinatorAgent',
base=SynchronizerCoordinatorAgent)
synchronizer_coordinator_agent.init_environment()
for iteration in range(1, 2):
for time_step in range(0, 90, 30):
synchronizer_coordinator_agent.first_synchronization(time_step=time_step, iteration=iteration)
synchronizer_coordinator_agent.status_checker()
time.sleep(1)
它打印这个然后挂起
(网络代理): {'time_step': 0, 'iteration': 1}
(RMOAgent): {'time_step': 0, 'iteration': 1}
是啊,看来你的 status_checker()
方法有问题。我猜您希望该方法阻塞,直到 status_list
中有 2 条消息(一条来自节点代理,另一条来自传输代理)。
所以您可能正在寻找类似的东西:
def status_checker(self):
while True:
if len(self.status_list) == 2:
break
time.sleep(1)
self.status_list.clear()
但是,当您从代理调用该方法时:
synchronizer_coordinator_agent.status_checker()
协调器正在阻止执行该调用,因此它不会处理其他传入消息。一种快速且 肮脏的 解决方法是使用这样的 unsafe
call:
synchronizer_coordinator_agent.unsafe.status_checker()
我在这里看到的主要问题是您处理来自 __main__
的状态检查器的方式。您应该将您的 synchronization/steps 移到 您的协调器中。这意味着:
- 从
__main__
调用协调器上的.start_iterations()
方法,以便协调器执行第一步 - 然后让你的协调器对收到的消息有反应(一旦它收到 2 条消息,它就会执行下一步)
- 协调器一直执行步骤直到完成
- 从 main 开始,只需定期监控您的协调代理,看看它何时完成
- 使用
ns.shutdown()
关闭您的系统
您的主要内容可能如下所示:
if __name__ == "__main__":
ns = run_nameserver()
coordinator = run_agent(...)
coordinator.init_environment()
coordinator.start_iterations()
while not coordinator.finished():
time.sleep(0.5)
ns.shutdown()
与此并无太大关系,但请注意,您当前的 range(1, 2)
只会导致一次迭代(尽管这可能是故意的)。如果你想要 2 次迭代,你可以使用 range(2)
.