如何让演员同时做两件事?

How to make an actor do two things simultaneously?

我定义了一个learner和一个worker。我希望learner运行在后台有它的成员函数learn,偶尔worker发送learner一些信息打印

以下代码为示例

import ray

@ray.remote
class Learner():
    def __init__(self):
        pass

    def learn(self):
        while True:
            pass # do something, such as updating network 

    def log_score(self, score):
        print('worker', score)

@ray.remote
class Worker():
    def __init__(self, learner):
        self.learner = learner

    def sample(self):
        for i in range(1000000):
            if i % 1000 == 0:
                self.learner.log_score.remote(i)

ray.init()

learner = Learner.remote()
worker = Worker.remote(learner)


worker.sample.remote()
learner.learn.remote()

while True:
    pass

然而,learner不会运行log_score直到learn完成,这不是我想要的。我想到了一种让它工作的方法:我没有显式调用 Learner.learn,而是让 Worker 调用它。具体来说,我重新定义learnsample如下

"""Learner"""
def learn(self):
    # no loop here
    pass # do something, such as updating network 

"""Worker"""
def sample(self):
    for i in range(1000000):
        if i % 1000 == 0:
            self.learner.learn.remote()
            self.learner.log_score.remote(i)

虽然这可行,但现在我必须控制调用 learn 的频率,这似乎有点多余。有什么更好的方法可以实现我想要的吗?

这是一个很好的问题。在 Ray 的 actor 模型中,每个 actor 任务都是原子的,因为 actor 将一次执行任务,并且在前一个任务完成 return 之前不会开始一个新任务。这种选择简化了关于并发性的推理,但让演员更难同时做两件事。

要完成这样的工作,您基本上有两种选择。

  1. 线程: 让 actor 在后台线程中做一些工作,让 actor 的主线程空闲,以便它可以执行新任务。

    import ray
    import threading
    import time
    
    @ray.remote
    class Actor(object):
        def __init__(self):
            self.value = 0
            self.t = threading.Thread(target=self.update, args=())
            self.t.start()
    
        def update(self):
            while True:
                time.sleep(0.01)
                self.value += 1
    
        def get_value(self):
            return self.value
    
    ray.init()
    
    # Create the actor. This will start a long-running thread in the background
    # that updates the value.
    a = Actor.remote()
    
    # Get the value a couple times.
    print(ray.get(a.get_value.remote()))
    print(ray.get(a.get_value.remote()))
    
  2. 更小的工作单元: 这意味着重构代码,以便没有演员方法永远循环。在您的示例中,您可以在循环一定次数后使 learn 函数 return 。在这种情况下,必须不断提交新的 learn 任务。甚至可以让 learn 方法提交 return 并提交自身,以便在两者之间安排其他方法。有很多方法可以做到这一点,这取决于您的应用程序,但下面是一个示例。

    import ray
    import threading
    import time
    
    @ray.remote
    class Actor(object):
        def __init__(self):
            self.value = 0
    
        def set_handle_to_self(self, handle_to_self):
            self.handle_to_self = handle_to_self
    
        def learn(self):
            for _ in range(10):
                time.sleep(0.01)
                self.value += 1
    
            # Submit the learn task again so that the learning continues
            # but other methods can be scheduled in between.
            self.handle_to_self.learn.remote()
    
        def get_value(self):
            return self.value
    
    ray.init()
    
    # Create the actor. This will start a long-running thread in the background
    # that updates the value.
    a = Actor.remote()
    # Give the actor a handle to itself so that it can submit tasks to itself.
    a.set_handle_to_self.remote(a)
    
    # Start the learning, which will continue forever.
    a.learn.remote()
    
    # Get the value a couple times.
    print(ray.get(a.get_value.remote()))
    print(ray.get(a.get_value.remote()))