有没有一种方法可以在流程完成处理后立即执行操作,而不是等待所有流程完成?
Is there a way to do actions as soon as a process is finished processing instead of waiting for all to finish?
我正在使用 Ray 库并行处理大量任务。我这样做的方法是通过添加要处理的任务,一起处理它们,然后更新一个队列,将我限制在每个级别的最慢任务。有没有办法在每次任务完成时动态更新队列?
这是一些代码:
while len(queue) > 0:
# add some tasks to be processed from queue
tasks = [x for x in queue if not some_condition(x)]
# remove tasks from queue
queue = [x for x in queue if x not in tasks]
#call Ray remote function on each node in the tasks
outputs = [self.process_nodes.remote(node) for node in tasks]
# go through outputs, update queue given children and relevant attributes and iterate.
out = ray.get(outputs)
do_stuff(out)
关于上面的代码,输出正在等待任务列表中的所有节点完成它可以继续,我想知道是否有一种方法可以在任务完成后立即从输出中获取任务,处理它,更新队列并处理任务中所有剩余的节点?[=11=]
您可以使用ray.wait()
检查哪些任务已完成,哪些需要等待更长时间。
您可以 运行 循环直到获得所有结果。
import time
import ray
@ray.remote
def function(i):
time.sleep(i)
return i
print('create tasks')
# create tasks
all_tasks = [function.remote(i) for i in range(4)]
print('wait for results')
# run loop to get all results
while all_tasks:
# wait for (at least one) finished tasks (and other tasks)
finished, all_tasks = ray.wait(all_tasks, num_returns=1, timeout=None)
for task in finished:
result = ray.get(task)
print('result:', result)
print('len(all_tasks):', len(all_tasks))
结果:
create tasks
wait for results
result: 0
len(all_tasks): 3
result: 1
len(all_tasks): 2
result: 2
len(all_tasks): 1
result: 3
len(all_tasks): 0
如果你使用 num_returns=2
那么它会等待 2 个结果然后你得到
create tasks
wait for results
result: 0
result: 1
len(all_tasks): 2
result: 2
result: 3
len(all_tasks): 0
如果你使用 time=0.5
那么它只会等待 0.5s 的结果所以你可能会得到这样的结果
create tasks
wait for results
result: 0
len(all_tasks): 3
len(all_tasks): 3
len(all_tasks): 3
result: 1
len(all_tasks): 2
len(all_tasks): 2
result: 2
len(all_tasks): 1
len(all_tasks): 1
len(all_tasks): 1
result: 3
len(all_tasks): 0
文档:ray.wait and Fetching results
我正在使用 Ray 库并行处理大量任务。我这样做的方法是通过添加要处理的任务,一起处理它们,然后更新一个队列,将我限制在每个级别的最慢任务。有没有办法在每次任务完成时动态更新队列?
这是一些代码:
while len(queue) > 0:
# add some tasks to be processed from queue
tasks = [x for x in queue if not some_condition(x)]
# remove tasks from queue
queue = [x for x in queue if x not in tasks]
#call Ray remote function on each node in the tasks
outputs = [self.process_nodes.remote(node) for node in tasks]
# go through outputs, update queue given children and relevant attributes and iterate.
out = ray.get(outputs)
do_stuff(out)
关于上面的代码,输出正在等待任务列表中的所有节点完成它可以继续,我想知道是否有一种方法可以在任务完成后立即从输出中获取任务,处理它,更新队列并处理任务中所有剩余的节点?[=11=]
您可以使用ray.wait()
检查哪些任务已完成,哪些需要等待更长时间。
您可以 运行 循环直到获得所有结果。
import time
import ray
@ray.remote
def function(i):
time.sleep(i)
return i
print('create tasks')
# create tasks
all_tasks = [function.remote(i) for i in range(4)]
print('wait for results')
# run loop to get all results
while all_tasks:
# wait for (at least one) finished tasks (and other tasks)
finished, all_tasks = ray.wait(all_tasks, num_returns=1, timeout=None)
for task in finished:
result = ray.get(task)
print('result:', result)
print('len(all_tasks):', len(all_tasks))
结果:
create tasks
wait for results
result: 0
len(all_tasks): 3
result: 1
len(all_tasks): 2
result: 2
len(all_tasks): 1
result: 3
len(all_tasks): 0
如果你使用 num_returns=2
那么它会等待 2 个结果然后你得到
create tasks
wait for results
result: 0
result: 1
len(all_tasks): 2
result: 2
result: 3
len(all_tasks): 0
如果你使用 time=0.5
那么它只会等待 0.5s 的结果所以你可能会得到这样的结果
create tasks
wait for results
result: 0
len(all_tasks): 3
len(all_tasks): 3
len(all_tasks): 3
result: 1
len(all_tasks): 2
len(all_tasks): 2
result: 2
len(all_tasks): 1
len(all_tasks): 1
len(all_tasks): 1
result: 3
len(all_tasks): 0
文档:ray.wait and Fetching results