有没有一种方法可以在流程完成处理后立即执行操作,而不是等待所有流程完成?

Is there a way to do actions as soon as a process is finished processing instead of waiting for all to finish?

我正在使用 Ray 库并行处理大量任务。我这样做的方法是通过添加要处理的任务,一起处理它们,然后更新一个队列,将我限制在每个级别的最慢任务。有没有办法在每次任务完成时动态更新队列?

这是一些代码:

    while len(queue) > 0: 
        # add some tasks to be processed from queue
        tasks = [x for x in queue if not some_condition(x)]
        # remove tasks from queue
        queue = [x for x in queue if x not in tasks]
        #call Ray remote function on each node in the tasks
        outputs  = [self.process_nodes.remote(node) for node in tasks]
        # go through outputs, update queue given children and relevant attributes and iterate.
        out = ray.get(outputs)
        do_stuff(out)

关于上面的代码,输出正在等待任务列表中的所有节点完成它可以继续,我想知道是否有一种方法可以在任务完成后立即从输出中获取任务,处理它,更新队列并处理任务中所有剩余的节点?[​​=11=]

您可以使用ray.wait()检查哪些任务已完成,哪些需要等待更长时间。

您可以 运行 循环直到获得所有结果。

import time
import ray

@ray.remote
def function(i):
    time.sleep(i)
    return i

print('create tasks')

# create tasks
all_tasks = [function.remote(i) for i in range(4)]

print('wait for results')

# run loop to get all results
while all_tasks:

    # wait for (at least one) finished tasks (and other tasks)
    finished, all_tasks = ray.wait(all_tasks, num_returns=1, timeout=None)
    
    for task in finished:
        result = ray.get(task)
        print('result:', result)
        
    print('len(all_tasks):', len(all_tasks))

结果:

create tasks
wait for results
result: 0
len(all_tasks): 3
result: 1
len(all_tasks): 2
result: 2
len(all_tasks): 1
result: 3
len(all_tasks): 0

如果你使用 num_returns=2 那么它会等待 2 个结果然后你得到

create tasks
wait for results
result: 0
result: 1
len(all_tasks): 2
result: 2
result: 3
len(all_tasks): 0

如果你使用 time=0.5 那么它只会等待 0.5s 的结果所以你可能会得到这样的结果

create tasks
wait for results
result: 0
len(all_tasks): 3
len(all_tasks): 3
len(all_tasks): 3
result: 1
len(all_tasks): 2
len(all_tasks): 2
result: 2
len(all_tasks): 1
len(all_tasks): 1
len(all_tasks): 1
result: 3
len(all_tasks): 0

文档:ray.wait and Fetching results