如何让演员同时做两件事?
How to make an actor do two things simultaneously?
我定义了一个learner
和一个worker
。我希望learner
运行在后台有它的成员函数learn
,偶尔worker
发送learner
一些信息打印
以下代码为示例
import ray
@ray.remote
class Learner():
def __init__(self):
pass
def learn(self):
while True:
pass # do something, such as updating network
def log_score(self, score):
print('worker', score)
@ray.remote
class Worker():
def __init__(self, learner):
self.learner = learner
def sample(self):
for i in range(1000000):
if i % 1000 == 0:
self.learner.log_score.remote(i)
ray.init()
learner = Learner.remote()
worker = Worker.remote(learner)
worker.sample.remote()
learner.learn.remote()
while True:
pass
然而,learner
不会运行log_score
直到learn
完成,这不是我想要的。我想到了一种让它工作的方法:我没有显式调用 Learner.learn
,而是让 Worker
调用它。具体来说,我重新定义learn
和sample
如下
"""Learner"""
def learn(self):
# no loop here
pass # do something, such as updating network
"""Worker"""
def sample(self):
for i in range(1000000):
if i % 1000 == 0:
self.learner.learn.remote()
self.learner.log_score.remote(i)
虽然这可行,但现在我必须控制调用 learn
的频率,这似乎有点多余。有什么更好的方法可以实现我想要的吗?
这是一个很好的问题。在 Ray 的 actor 模型中,每个 actor 任务都是原子的,因为 actor 将一次执行任务,并且在前一个任务完成 return 之前不会开始一个新任务。这种选择简化了关于并发性的推理,但让演员更难同时做两件事。
要完成这样的工作,您基本上有两种选择。
线程: 让 actor 在后台线程中做一些工作,让 actor 的主线程空闲,以便它可以执行新任务。
import ray
import threading
import time
@ray.remote
class Actor(object):
def __init__(self):
self.value = 0
self.t = threading.Thread(target=self.update, args=())
self.t.start()
def update(self):
while True:
time.sleep(0.01)
self.value += 1
def get_value(self):
return self.value
ray.init()
# Create the actor. This will start a long-running thread in the background
# that updates the value.
a = Actor.remote()
# Get the value a couple times.
print(ray.get(a.get_value.remote()))
print(ray.get(a.get_value.remote()))
更小的工作单元: 这意味着重构代码,以便没有演员方法永远循环。在您的示例中,您可以在循环一定次数后使 learn
函数 return 。在这种情况下,必须不断提交新的 learn
任务。甚至可以让 learn
方法提交 return 并提交自身,以便在两者之间安排其他方法。有很多方法可以做到这一点,这取决于您的应用程序,但下面是一个示例。
import ray
import threading
import time
@ray.remote
class Actor(object):
def __init__(self):
self.value = 0
def set_handle_to_self(self, handle_to_self):
self.handle_to_self = handle_to_self
def learn(self):
for _ in range(10):
time.sleep(0.01)
self.value += 1
# Submit the learn task again so that the learning continues
# but other methods can be scheduled in between.
self.handle_to_self.learn.remote()
def get_value(self):
return self.value
ray.init()
# Create the actor. This will start a long-running thread in the background
# that updates the value.
a = Actor.remote()
# Give the actor a handle to itself so that it can submit tasks to itself.
a.set_handle_to_self.remote(a)
# Start the learning, which will continue forever.
a.learn.remote()
# Get the value a couple times.
print(ray.get(a.get_value.remote()))
print(ray.get(a.get_value.remote()))
我定义了一个learner
和一个worker
。我希望learner
运行在后台有它的成员函数learn
,偶尔worker
发送learner
一些信息打印
以下代码为示例
import ray
@ray.remote
class Learner():
def __init__(self):
pass
def learn(self):
while True:
pass # do something, such as updating network
def log_score(self, score):
print('worker', score)
@ray.remote
class Worker():
def __init__(self, learner):
self.learner = learner
def sample(self):
for i in range(1000000):
if i % 1000 == 0:
self.learner.log_score.remote(i)
ray.init()
learner = Learner.remote()
worker = Worker.remote(learner)
worker.sample.remote()
learner.learn.remote()
while True:
pass
然而,learner
不会运行log_score
直到learn
完成,这不是我想要的。我想到了一种让它工作的方法:我没有显式调用 Learner.learn
,而是让 Worker
调用它。具体来说,我重新定义learn
和sample
如下
"""Learner"""
def learn(self):
# no loop here
pass # do something, such as updating network
"""Worker"""
def sample(self):
for i in range(1000000):
if i % 1000 == 0:
self.learner.learn.remote()
self.learner.log_score.remote(i)
虽然这可行,但现在我必须控制调用 learn
的频率,这似乎有点多余。有什么更好的方法可以实现我想要的吗?
这是一个很好的问题。在 Ray 的 actor 模型中,每个 actor 任务都是原子的,因为 actor 将一次执行任务,并且在前一个任务完成 return 之前不会开始一个新任务。这种选择简化了关于并发性的推理,但让演员更难同时做两件事。
要完成这样的工作,您基本上有两种选择。
线程: 让 actor 在后台线程中做一些工作,让 actor 的主线程空闲,以便它可以执行新任务。
import ray import threading import time @ray.remote class Actor(object): def __init__(self): self.value = 0 self.t = threading.Thread(target=self.update, args=()) self.t.start() def update(self): while True: time.sleep(0.01) self.value += 1 def get_value(self): return self.value ray.init() # Create the actor. This will start a long-running thread in the background # that updates the value. a = Actor.remote() # Get the value a couple times. print(ray.get(a.get_value.remote())) print(ray.get(a.get_value.remote()))
更小的工作单元: 这意味着重构代码,以便没有演员方法永远循环。在您的示例中,您可以在循环一定次数后使
learn
函数 return 。在这种情况下,必须不断提交新的learn
任务。甚至可以让learn
方法提交 return 并提交自身,以便在两者之间安排其他方法。有很多方法可以做到这一点,这取决于您的应用程序,但下面是一个示例。import ray import threading import time @ray.remote class Actor(object): def __init__(self): self.value = 0 def set_handle_to_self(self, handle_to_self): self.handle_to_self = handle_to_self def learn(self): for _ in range(10): time.sleep(0.01) self.value += 1 # Submit the learn task again so that the learning continues # but other methods can be scheduled in between. self.handle_to_self.learn.remote() def get_value(self): return self.value ray.init() # Create the actor. This will start a long-running thread in the background # that updates the value. a = Actor.remote() # Give the actor a handle to itself so that it can submit tasks to itself. a.set_handle_to_self.remote(a) # Start the learning, which will continue forever. a.learn.remote() # Get the value a couple times. print(ray.get(a.get_value.remote())) print(ray.get(a.get_value.remote()))