Dask 诊断 - 进度条 map_partition / 延迟

Dask diagnostics - progress bar with map_partition / delayed

我正在使用分布式调度程序和分布式进度条。 有没有办法让进度条工作 Dataframe.map_partition 或延迟?我认为缺乏期货是导致酒吧无法运作的原因。如果我将我的代码更改为 client.submit,进度条会起作用。

import dask.dataframe as dd
from distributed import Client
from distributed.diagnostics.progressbar import progress

client = Client("tcp://....")
...
ddf = dd.read_parquet("...")
ddf = ddf.map_partitions(..)
progress(ddf)  # no futures to pass
dask.compute(ddf)
delayed = [dask.deplayed(myfunc)(ddf.get_partition(i)) for i in range(ddf.npartitions)]
progress(delayed)
dask.compute(*delayed)
futures = [client.submit(myfunc, ddf.get_partition(i)) for i in range(ddf.npartitions)]
progress(futures)
dask.compute(*futures)

Is there a way to get the progress bar (or a report of tasks completed vs total) working for map_partitions or dask.delayed ?

import dask
import npumpy as np
import pandas as pd
import dask.dataframe as dd
from distributed import Client
from distributed.diagnostics.progressbar import progress
import time

cl = Client("tcp://10.0.2.15:8786")

def wait(df):
    print("Received chunk")
    time.sleep(2)
    print("finish")

df = pd.DataFrame(np.random.randint(0, 100, size=(1000, 4)), columns=list('ABCD'))
ddf = dd.from_pandas(df, npartitions=4)

futures = [dask.delayed(wait)(ddf.get_partition(i)) for i in range(ddf.npartitions)]
progress(futures)
dask.compute(*futures)

是的,你是对的,progress 旨在处理期货或包含期货的集合。不过,您无需提交大量期货即可使用它:

ddf = ddf.map_partitions(..)
fut = client.compute(ddf)
progress(fut)
# wait on fut, call fut.result() or continue

另外不要忘记:您正在使用的分布式调度程序,即使只在一台机器上,也带有一个包含相同信息的诊断仪表板。通常这是在 http://localhost:8787,您可以从任何浏览器访问。