如何使用射线多处理有效地填充列表?
How to efficiently fill a list with ray multi-processing?
我正在开展一个项目,我需要收集 rollouts 来填充数据集,然后在其上训练模型。我想并行收集这些 rollouts 以加快进程,所以我尝试使用 ray 库提供的多处理:
import time
import ray
ray.init()
@ray.remote
class MainActor:
def __init__(self):
self.data_set = []
def process_item(self, item):
# process incoming item
item+=1
return item
def append(self, val):
# add item to data set after processing it
item = self.process_item(val)
self.data_set.append(item)
def train(self):
# train on datatset
return len(self.data_set)
main_actor = MainActor.remote()
@ray.remote
def rollout_collector(main_actor):
t = time.time()
for i in range(40000):
main_actor.append.remote(i)
print("time per rollout : ",time.time() - t)
t = time.time()
ray.get([rollout_collector.remote(main_actor) for i in range(3)])
ray.get(main_actor.train.remote())
print("total time with multi-processing: ",time.time() - t)
# ============================== Single process ================================
class MainActor:
def __init__(self):
self.data_set = []
def process_item(self, item):
# process incoming item
item+=1
return item
def append(self, val):
# add item to data set after processing it
item = self.process_item(val)
self.data_set.append(item)
def train(self):
# train on datatset
pass
main_actor = MainActor()
t = time.time()
for i in range(120000):
main_actor.append(i)
main_actor.train()
print("total time with single process : ",time.time() - t)
rollout_collector 收集项目,然后在处理后将它们存储在 MainActor 中,最终对其进行训练。但是,这种方法非常慢:
使用光线时每次推出需要 20 秒,而没有光线和多处理时为 0.12 秒。我有一个 cpu 给主要演员,3 个给 rollout_collector 。
我不认为我可以将 rollouts 存储在 rollout_collector 工作人员中,然后将所有内容发送给主要参与者,因为我正在使用水库采样进行培训。此外,在使用 ray 时,对于一个非常简单的函数,我的执行时间非常长:MainActor 上的训练函数只是 return 数据集的长度,但它仍然需要 20 秒才能执行。
所以我的问题是我做错了什么吗?实现我的目标的最佳方式是什么?
跨进程RPC的成本(actor.append.remote(i)
)肯定会比修改进程内数据结构(list.append(i)
)高。前者涉及 serialization/deserialization、通过系统调用进行的进程间通信以及排队,因为您是从 3 个并发工作进程调用它的。 actor.train.remote()
花了这么长时间的原因可能也是因为 120K 排队的远程调用。
如果实际上您的推出过程需要更多时间,那么多处理肯定会加快速度。我在你的 rollout_collector
和最后的单进程循环中添加了 time.sleep(0.01)
延迟,将推出次数减少到 400,然后多进程版本用了 4.5 秒完成,对比 13单进程版本的秒数。
如果您的个人推出速度非常快,请考虑在工作进程中对结果进行本地批处理,然后再将它们发送给主要参与者。
我正在开展一个项目,我需要收集 rollouts 来填充数据集,然后在其上训练模型。我想并行收集这些 rollouts 以加快进程,所以我尝试使用 ray 库提供的多处理:
import time
import ray
ray.init()
@ray.remote
class MainActor:
def __init__(self):
self.data_set = []
def process_item(self, item):
# process incoming item
item+=1
return item
def append(self, val):
# add item to data set after processing it
item = self.process_item(val)
self.data_set.append(item)
def train(self):
# train on datatset
return len(self.data_set)
main_actor = MainActor.remote()
@ray.remote
def rollout_collector(main_actor):
t = time.time()
for i in range(40000):
main_actor.append.remote(i)
print("time per rollout : ",time.time() - t)
t = time.time()
ray.get([rollout_collector.remote(main_actor) for i in range(3)])
ray.get(main_actor.train.remote())
print("total time with multi-processing: ",time.time() - t)
# ============================== Single process ================================
class MainActor:
def __init__(self):
self.data_set = []
def process_item(self, item):
# process incoming item
item+=1
return item
def append(self, val):
# add item to data set after processing it
item = self.process_item(val)
self.data_set.append(item)
def train(self):
# train on datatset
pass
main_actor = MainActor()
t = time.time()
for i in range(120000):
main_actor.append(i)
main_actor.train()
print("total time with single process : ",time.time() - t)
rollout_collector 收集项目,然后在处理后将它们存储在 MainActor 中,最终对其进行训练。但是,这种方法非常慢:
使用光线时每次推出需要 20 秒,而没有光线和多处理时为 0.12 秒。我有一个 cpu 给主要演员,3 个给 rollout_collector 。 我不认为我可以将 rollouts 存储在 rollout_collector 工作人员中,然后将所有内容发送给主要参与者,因为我正在使用水库采样进行培训。此外,在使用 ray 时,对于一个非常简单的函数,我的执行时间非常长:MainActor 上的训练函数只是 return 数据集的长度,但它仍然需要 20 秒才能执行。
所以我的问题是我做错了什么吗?实现我的目标的最佳方式是什么?
跨进程RPC的成本(actor.append.remote(i)
)肯定会比修改进程内数据结构(list.append(i)
)高。前者涉及 serialization/deserialization、通过系统调用进行的进程间通信以及排队,因为您是从 3 个并发工作进程调用它的。 actor.train.remote()
花了这么长时间的原因可能也是因为 120K 排队的远程调用。
如果实际上您的推出过程需要更多时间,那么多处理肯定会加快速度。我在你的 rollout_collector
和最后的单进程循环中添加了 time.sleep(0.01)
延迟,将推出次数减少到 400,然后多进程版本用了 4.5 秒完成,对比 13单进程版本的秒数。
如果您的个人推出速度非常快,请考虑在工作进程中对结果进行本地批处理,然后再将它们发送给主要参与者。