如何找到 Dask 分布式函数调用的 concurrent.future 输入参数
How to find the concurrent.future input arguments for a Dask distributed function call
我正在使用 Dask 将工作分配到集群。我正在创建一个集群并调用 .submit()
向调度程序提交一个函数。它 returns 一个 Futures 对象。我正在尝试弄清楚如何在该未来对象完成后获取输入参数。
例如:
from dask.distributed import Client
from dask_yarn import YarnCluster
def somefunc(a,b,c ..., n ):
# do something
return
cluster = YarnCluster.from_specification(spec)
client = Client(cluster)
future = client.submit(somefunc, arg1, arg2, ..., argn)
# ^^^ how do I obtain the input arguments for this future object?
# `future.args` doesn't work
一个 future 只知道它在调度程序上唯一知道的密钥。在提交时,如果它有依赖项,这些依赖项会被暂时找到并发送给调度程序,但如果保存在本地则不会复制。
您所追求的模式听起来更像是 delayed
,它保留了它的图表,实际上 client.compute(delayed_thing)
returns 一个未来。
d = delayed(somefunc)(a, b, c)
future = client.compute(d)
dict(d.dask) # graph of things needed by d
您可以直接与调度程序通信以找到某些键的依赖关系,这些键通常也是键,因此对图进行逆向工程,但这听起来不是一个好方法,所以我不会不要试图在这里描述它。
期货不会保留他们的投入。不过你可以自己做。
futures = {}
future = client.submit(func, *args)
futures[future] = args
我正在使用 Dask 将工作分配到集群。我正在创建一个集群并调用 .submit()
向调度程序提交一个函数。它 returns 一个 Futures 对象。我正在尝试弄清楚如何在该未来对象完成后获取输入参数。
例如:
from dask.distributed import Client
from dask_yarn import YarnCluster
def somefunc(a,b,c ..., n ):
# do something
return
cluster = YarnCluster.from_specification(spec)
client = Client(cluster)
future = client.submit(somefunc, arg1, arg2, ..., argn)
# ^^^ how do I obtain the input arguments for this future object?
# `future.args` doesn't work
一个 future 只知道它在调度程序上唯一知道的密钥。在提交时,如果它有依赖项,这些依赖项会被暂时找到并发送给调度程序,但如果保存在本地则不会复制。
您所追求的模式听起来更像是 delayed
,它保留了它的图表,实际上 client.compute(delayed_thing)
returns 一个未来。
d = delayed(somefunc)(a, b, c)
future = client.compute(d)
dict(d.dask) # graph of things needed by d
您可以直接与调度程序通信以找到某些键的依赖关系,这些键通常也是键,因此对图进行逆向工程,但这听起来不是一个好方法,所以我不会不要试图在这里描述它。
期货不会保留他们的投入。不过你可以自己做。
futures = {}
future = client.submit(func, *args)
futures[future] = args