任务分布式。如何在正在计算的函数中获取任务密钥ID?
Dask-distributed. How to get task key ID in the function being calculated?
我使用 dask.distributed 进行的计算包括创建名称包含 UUID4 的中间文件,这些中间文件标识了该工作块。
pairs = '{}\n{}\n{}\n{}'.format(list1, list2, list3, ...)
file_path = os.path.join(job_output_root, 'pairs',
'pairs-{}.txt'.format(str(uuid.uuid4()).replace('-', '')))
file(file_path, 'wt').writelines(pairs)
同时,dask分布式集群中的所有任务都有唯一的key。因此,使用该密钥 ID 作为文件名是很自然的。
可以吗?
有两种方法可以解决这个问题:
- 您确定uuid并将其传递给Dask(已实现)
- Dask 确定 uuid 并将其传递给您的函数(未实现,但可能)
您将 uuid 传递给 Dask
.submit
等函数接受 key=
关键字参数,您可以在其中指定要使用的键
>>> e.submit(inc, 1, key='inc-12345')
<Future: status: pending, key: inc-12345>
同样 dask.delayed 函数支持 dask_key_name
关键字参数
>>> value = delayed(inc)(1, dask_key_name='inc-12345')
您从 Dask 获得密钥
调度程序在执行每个任务期间将这样的上下文信息放入每个线程的全局变量中。从版本 1.13 开始,它可用如下:
def your_function(...):
from distributed.worker import thread_state
key = thread_state.key
future = e.submit(your_function, ...)
我使用 dask.distributed 进行的计算包括创建名称包含 UUID4 的中间文件,这些中间文件标识了该工作块。
pairs = '{}\n{}\n{}\n{}'.format(list1, list2, list3, ...)
file_path = os.path.join(job_output_root, 'pairs',
'pairs-{}.txt'.format(str(uuid.uuid4()).replace('-', '')))
file(file_path, 'wt').writelines(pairs)
同时,dask分布式集群中的所有任务都有唯一的key。因此,使用该密钥 ID 作为文件名是很自然的。
可以吗?
有两种方法可以解决这个问题:
- 您确定uuid并将其传递给Dask(已实现)
- Dask 确定 uuid 并将其传递给您的函数(未实现,但可能)
您将 uuid 传递给 Dask
.submit
等函数接受 key=
关键字参数,您可以在其中指定要使用的键
>>> e.submit(inc, 1, key='inc-12345')
<Future: status: pending, key: inc-12345>
同样 dask.delayed 函数支持 dask_key_name
关键字参数
>>> value = delayed(inc)(1, dask_key_name='inc-12345')
您从 Dask 获得密钥
调度程序在执行每个任务期间将这样的上下文信息放入每个线程的全局变量中。从版本 1.13 开始,它可用如下:
def your_function(...):
from distributed.worker import thread_state
key = thread_state.key
future = e.submit(your_function, ...)