如何编写一种方法来检查代理是否已完成 Osbrain 中的任务?

How to write a method that checks if an agent is done with a task in Osbrain?

我有一个问题,关于如何编写一个正确的函数来检查代理是否在 Osbrain 中完成了他们的任务。我有三个代理,传输代理、节点代理和协调器代理。协调代理的主要任务是同步其他代理的动作。协调器代理绑定到 SYNC_PUB,节点和传输代理绑定到协调器代理。我的初始实现在第一个 timestep/iteration 之后挂起。我是否错误地实施了 status_checker 方法?

from osbrain import run_nameserver, run_agent, Agent

import time

SYNCHRONIZER_CHANNEL_1 = 'coordinator1'


class TransportAgent(Agent):
    def transportAgent_first_handler(self, message):
        # time.sleep(2)
        self.log_info(message)
        self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')

    def process_reply(self, message):
        yield 1


class NodeAgent(Agent):
    def NodeAgent_first_handler(self, message):
        time.sleep(2)
        self.log_info(message)
        self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')

    def process_reply(self, message):
        yield 1


class SynchronizerCoordinatorAgent(Agent):
    def on_init(self):
        self.network_agent_addr = self.bind('SYNC_PUB', alias=SYNCHRONIZER_CHANNEL_1, handler='status_handler')
        self.status_list = []

    def first_synchronization(self, time_step, iteration):
        self.send(SYNCHRONIZER_CHANNEL_1, message={'time_step': time_step, 'iteration': iteration},
                  topic='first_synchronization')

    def status_handler(self, message):
        yield 'I have added you to the status_list'
        self.status_list.append(message)

    def status_checker(self):
        count = 0
        while len(self.status_list) < 2:
            count += 1
            time.sleep(1)
            return
        self.status_list.clear()


    def init_environment(self):
        self.TransportAgent = run_agent('TransportAgent', base=TransportAgent)

        self.NodeAgent = run_agent('NodeAgent', base=NodeAgent)

        self.TransportAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
                                    handler={'first_synchronization': TransportAgent.transportAgent_first_handler})
        self.NodeAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
                               handler={'first_synchronization': NodeAgent.NodeAgent_first_handler})


if __name__ == '__main__':

    ns = run_nameserver()
    synchronizer_coordinator_agent = run_agent('Synchronizer_CoordinatorAgent',
                                               base=SynchronizerCoordinatorAgent)
    synchronizer_coordinator_agent.init_environment()

    for iteration in range(1, 2):
        for time_step in range(0, 90, 30):
            synchronizer_coordinator_agent.first_synchronization(time_step=time_step, iteration=iteration)
            synchronizer_coordinator_agent.status_checker()
    time.sleep(1)

它打印这个然后挂起

(网络代理): {'time_step': 0, 'iteration': 1}

(RMOAgent): {'time_step': 0, 'iteration': 1}

是啊,看来你的 status_checker() 方法有问题。我猜您希望该方法阻塞,直到 status_list 中有 2 条消息(一条来自节点代理,另一条来自传输代理)。

所以您可能正在寻找类似的东西:

def status_checker(self):
    while True:
        if len(self.status_list) == 2:
            break
        time.sleep(1)
    self.status_list.clear()

但是,当您从代理调用该方法时:

synchronizer_coordinator_agent.status_checker()

协调器正在阻止执行该调用,因此它不会处理其他传入消息。一种快速且 肮脏的 解决方法是使用这样的 unsafe call

synchronizer_coordinator_agent.unsafe.status_checker()

我在这里看到的主要问题是您处理来自 __main__ 的状态检查器的方式。您应该将您的 synchronization/steps 移到 您的协调器中。这意味着:

  • __main__ 调用协调器上的 .start_iterations() 方法,以便协调器执行第一步
  • 然后让你的协调器对收到的消息有反应(一旦它收到 2 条消息,它就会执行下一步)
  • 协调器一直执行步骤直到完成
  • 从 main 开始,只需定期监控您的协调代理,看看它何时完成
  • 使用 ns.shutdown()
  • 关闭您的系统

您的主要内容可能如下所示:

if __name__ == "__main__":

    ns = run_nameserver()

    coordinator = run_agent(...)
    coordinator.init_environment()
    coordinator.start_iterations()

    while not coordinator.finished():
        time.sleep(0.5)

    ns.shutdown()

与此并无太大关系,但请注意,您当前的 range(1, 2) 只会导致一次迭代(尽管这可能是故意的)。如果你想要 2 次迭代,你可以使用 range(2).