Dask - 诊断仪表板 - 关于任务的自定义信息

Dask - diagnostics dashboard - custom info about task

我正在使用 Dask 来安排和 运行 研究批次。 这些大多会产生副作用并且非常重(从几分钟到几个小时不等)。任务之间没有通信。

在代码中它看起来像这样,首先我将所有批次传递给处理:

def process_batches(batches: Iterator[Batch], log_dir: Path):

    cluster = LocalCluster(
        n_workers=os.cpu_count(),
        threads_per_worker=1
    )

    client = Client(cluster)
    futures = []

    for batch in batches:
        futures += process_batch(batch, client, log_dir)

    progress(futures)

然后我将每批次的重复项作为任务提交:

def process_batch(batch: Batch, client: Client, log_dir: Path) -> List[Future]:
    batch_dir = log_dir.joinpath(batch.nice_hash)
    batch_futures = []

    num_workers = len(client.scheduler_info()['workers'])

    with Logger(batch_dir, clear_dir=True) as logger:
        logger.save_json(batch.as_dict, 'batch')

        for repetition in range(batch.n_repeats):
            cpu_index = repetition % num_workers

            future = client.submit(
                process_batch_repetition,
                batch,
                repetition,
                cpu_index,
                logger
            )

            batch_futures.append(future)

    return batch_futures

有什么方法可以将有关已提交任务的一些自定义信息传递到仪表板吗? 我所看到的只是任务 process_batch_repetition。我可以用自定义字符串替换它吗,这样我就可以看到目前正在处理哪些批处理配置?

从 Dask 的 BDFL 得到了答案 mrocklin

You can use the key= keyword to specify a key for the future. This should be unique per future. Dask will use the prefix of the key name to determine how it is rendered on the dashboard. See the docstring for dask.utils.key_split for examples on how a key prefix is generated from a key.

所以你可以这样使用它:

future = client.submit(
    process_batch_repetition,
    batch,
    repetition,
    cpu_index,
    logger,
    key=f'{str(batch)}_repetition_{repetition}'
)

您只需为此任务传递一个唯一的字符串。有一些禁止使用的字符(即空格),所以预计会出现一些关键错误。