如何在任务完成时而不是在 Dask 完成后获得任务结果?
How to get results of tasks when they finish and not after all have finished in Dask?
我有一个 dask 数据框,想计算一些独立的任务。有些任务比其他任务快,但我是在较长的任务完成后才得到每个任务的结果。
我创建了一个本地客户端并使用client.compute()
发送任务。然后我用future.result()
得到每个任务的结果
我正在使用线程同时请求结果并测量每个结果的计算时间,如下所示:
def get_result(future,i):
t0 = time.time()
print("calculating result", i)
result = future.result()
print("result {} took {}".format(i, time.time() - t0))
client = Client()
df = dd.read_csv(path_to_csv)
future1 = client.compute(df[df.x > 200])
future2 = client.compute(df[df.x > 500])
threading.Thread(target=get_result, args=[future1,1]).start()
threading.Thread(target=get_result, args=[future2,2]).start()
我希望上面代码的输出类似于:
calculating result 1
calculating result 2
result 2 took 10
result 1 took 46
因为第一个任务比较大。
但是我同时得到了两者
calculating result 1
calculating result 2
result 2 took 46.3046760559082
result 1 took 46.477620363235474
我认为这是因为 future2 实际上在后台计算并在 future1 之前完成,但它会等到 future1 完成到 return。
有什么方法可以在 future2 完成时得到它的结果吗?
您不需要让线程以异步方式使用 futures - 它们本质上已经是异步的,并在后台监视它们的状态。如果你想按照准备好的顺序获得结果,你应该使用 as_completed
.
但是,对于您的具体情况,您可能只想查看仪表板(或使用 df.visulalize()
)以了解正在发生的计算。两种未来都取决于读取 CSV,并且在任何一种 运行 之前都需要完成这项任务 - 并且可能会占用绝大多数时间。 Dask 在不扫描所有数据的情况下不知道哪些行具有 x
.
的值
我有一个 dask 数据框,想计算一些独立的任务。有些任务比其他任务快,但我是在较长的任务完成后才得到每个任务的结果。
我创建了一个本地客户端并使用client.compute()
发送任务。然后我用future.result()
得到每个任务的结果
我正在使用线程同时请求结果并测量每个结果的计算时间,如下所示:
def get_result(future,i):
t0 = time.time()
print("calculating result", i)
result = future.result()
print("result {} took {}".format(i, time.time() - t0))
client = Client()
df = dd.read_csv(path_to_csv)
future1 = client.compute(df[df.x > 200])
future2 = client.compute(df[df.x > 500])
threading.Thread(target=get_result, args=[future1,1]).start()
threading.Thread(target=get_result, args=[future2,2]).start()
我希望上面代码的输出类似于:
calculating result 1
calculating result 2
result 2 took 10
result 1 took 46
因为第一个任务比较大。
但是我同时得到了两者
calculating result 1
calculating result 2
result 2 took 46.3046760559082
result 1 took 46.477620363235474
我认为这是因为 future2 实际上在后台计算并在 future1 之前完成,但它会等到 future1 完成到 return。
有什么方法可以在 future2 完成时得到它的结果吗?
您不需要让线程以异步方式使用 futures - 它们本质上已经是异步的,并在后台监视它们的状态。如果你想按照准备好的顺序获得结果,你应该使用 as_completed
.
但是,对于您的具体情况,您可能只想查看仪表板(或使用 df.visulalize()
)以了解正在发生的计算。两种未来都取决于读取 CSV,并且在任何一种 运行 之前都需要完成这项任务 - 并且可能会占用绝大多数时间。 Dask 在不扫描所有数据的情况下不知道哪些行具有 x
.