在 dask distributed 中提交 worker 函数,而无需等待函数结束
Submit worker functions in dask distributed without waiting for the functions to end
我有这个 python 代码,它使用 apscheduler
库来提交进程,它工作正常:
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
array = [ 1, 3, 5, 7]
for elem in array:
scheduler.add_job(function_to_submit, kwargs={ 'elem': elem })
scheduler.start()
def function_to_submit(elem):
print(str(elem))
请注意,进程是并行提交的,并且代码不会等待进程结束。
我需要的是将此代码迁移到 dask distributed
以使用 worker。我遇到的问题是,如果我使用 dask
提交方法,代码会一直等到所有函数结束,我需要代码才能继续。如何实现?
client = Client('127.0.0.1:8786')
future1 = client.submit(function_to_submit, 1)
future3 = client.submit(function_to_submit, 3)
L = [future1, future3]
client.gather(L) # <-- this waits until all the futures end
Dask distributed 有一个 fire_and_forget
method which is an alternative to e.g. client.compute
or dask.distributed.wait
如果你希望 调度程序 挂起任务,即使 futures 已经超出 [=22= 的范围] 提交它们的进程。
from dask.distributed import Client, fire_and_forget
client = Client('127.0.0.1:8786')
fire_and_forget(client.submit(function_to_submit, 1))
fire_and_forget(client.submit(function_to_submit, 3))
# your script can now end and the scheduler will
# continue executing these until they end or the
# scheduler is terminated
有关示例和其他使用模式,请参阅 dask.distributed docs on Futures。
我有这个 python 代码,它使用 apscheduler
库来提交进程,它工作正常:
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
array = [ 1, 3, 5, 7]
for elem in array:
scheduler.add_job(function_to_submit, kwargs={ 'elem': elem })
scheduler.start()
def function_to_submit(elem):
print(str(elem))
请注意,进程是并行提交的,并且代码不会等待进程结束。
我需要的是将此代码迁移到 dask distributed
以使用 worker。我遇到的问题是,如果我使用 dask
提交方法,代码会一直等到所有函数结束,我需要代码才能继续。如何实现?
client = Client('127.0.0.1:8786')
future1 = client.submit(function_to_submit, 1)
future3 = client.submit(function_to_submit, 3)
L = [future1, future3]
client.gather(L) # <-- this waits until all the futures end
Dask distributed 有一个 fire_and_forget
method which is an alternative to e.g. client.compute
or dask.distributed.wait
如果你希望 调度程序 挂起任务,即使 futures 已经超出 [=22= 的范围] 提交它们的进程。
from dask.distributed import Client, fire_and_forget
client = Client('127.0.0.1:8786')
fire_and_forget(client.submit(function_to_submit, 1))
fire_and_forget(client.submit(function_to_submit, 3))
# your script can now end and the scheduler will
# continue executing these until they end or the
# scheduler is terminated
有关示例和其他使用模式,请参阅 dask.distributed docs on Futures。