如何使用射线多处理有效地填充列表?

How to efficiently fill a list with ray multi-processing?

我正在开展一个项目,我需要收集 rollouts 来填充数据集,然后在其上训练模型。我想并行收集这些 rollouts 以加快进程,所以我尝试使用 ray 库提供的多处理:

import time
import ray

ray.init()

@ray.remote
class MainActor:
    def __init__(self):
        self.data_set = []

    def process_item(self, item):
        # process incoming item
        item+=1
        return item

    def append(self, val):
        # add item to data set after processing it
        item = self.process_item(val)
        self.data_set.append(item)

    def train(self):
        # train on datatset
        return len(self.data_set)

main_actor = MainActor.remote()

@ray.remote
def rollout_collector(main_actor):
    t = time.time()
    for i in range(40000):
        main_actor.append.remote(i)
    print("time per rollout : ",time.time() - t)


t = time.time()
ray.get([rollout_collector.remote(main_actor) for i in range(3)])
ray.get(main_actor.train.remote())
print("total time with multi-processing: ",time.time() - t)

# ============================== Single process ================================

class MainActor:
    def __init__(self):
        self.data_set = []

    def process_item(self, item):
        # process incoming item
        item+=1
        return item

    def append(self, val):
        # add item to data set after processing it
        item = self.process_item(val)
        self.data_set.append(item)

    def train(self):
        # train on datatset
        pass

main_actor = MainActor()
t = time.time()
for i in range(120000):
    main_actor.append(i)
main_actor.train()
print("total time with single process : ",time.time() - t)

rollout_collector 收集项目,然后在处理后将它们存储在 MainActor 中,最终对其进行训练。但是,这种方法非常慢:

使用光线时每次推出需要 20 秒,而没有光线和多处理时为 0.12 秒。我有一个 cpu 给主要演员,3 个给 rollout_collector 。 我不认为我可以将 rollouts 存储在 rollout_collector 工作人员中,然后将所有内容发送给主要参与者,因为我正在使用水库采样进行培训。此外,在使用 ray 时,对于一个非常简单的函数,我的执行时间非常长:MainActor 上的训练函数只是 return 数据集的长度,但它仍然需要 20 秒才能执行。

所以我的问题是我做错了什么吗?实现我的目标的最佳方式是什么?

跨进程RPC的成本(actor.append.remote(i))肯定会比修改进程内数据结构(list.append(i))高。前者涉及 serialization/deserialization、通过系统调用进行的进程间通信以及排队,因为您是从 3 个并发工作进程调用它的。 actor.train.remote() 花了这么长时间的原因可能也是因为 120K 排队的远程调用。

如果实际上您的推出过程需要更多时间,那么多处理肯定会加快速度。我在你的 rollout_collector 和最后的单进程循环中添加了 time.sleep(0.01) 延迟,将推出次数减少到 400,然后多进程版本用了 4.5 秒完成,对比 13单进程版本的秒数。

如果您的个人推出速度非常快,请考虑在工作进程中对结果进行本地批处理,然后再将它们发送给主要参与者。