Dask 诊断 - 进度条 map_partition / 延迟
Dask diagnostics - progress bar with map_partition / delayed
我正在使用分布式调度程序和分布式进度条。
有没有办法让进度条工作 Dataframe.map_partition 或延迟?我认为缺乏期货是导致酒吧无法运作的原因。如果我将我的代码更改为 client.submit
,进度条会起作用。
- 代码如下所示:
import dask.dataframe as dd
from distributed import Client
from distributed.diagnostics.progressbar import progress
client = Client("tcp://....")
...
ddf = dd.read_parquet("...")
ddf = ddf.map_partitions(..)
progress(ddf) # no futures to pass
dask.compute(ddf)
- 替代 dask.delayed 也不起作用:
delayed = [dask.deplayed(myfunc)(ddf.get_partition(i)) for i in range(ddf.npartitions)]
progress(delayed)
dask.compute(*delayed)
- Client.submit 确实生成了一个工作进度条,但代码执行失败,我还没有设法调试它。
futures = [client.submit(myfunc, ddf.get_partition(i)) for i in range(ddf.npartitions)]
progress(futures)
dask.compute(*futures)
Is there a way to get the progress bar (or a report of tasks completed vs total) working for map_partitions or dask.delayed ?
- 带有延迟的完整代码示例:
import dask
import npumpy as np
import pandas as pd
import dask.dataframe as dd
from distributed import Client
from distributed.diagnostics.progressbar import progress
import time
cl = Client("tcp://10.0.2.15:8786")
def wait(df):
print("Received chunk")
time.sleep(2)
print("finish")
df = pd.DataFrame(np.random.randint(0, 100, size=(1000, 4)), columns=list('ABCD'))
ddf = dd.from_pandas(df, npartitions=4)
futures = [dask.delayed(wait)(ddf.get_partition(i)) for i in range(ddf.npartitions)]
progress(futures)
dask.compute(*futures)
是的,你是对的,progress
旨在处理期货或包含期货的集合。不过,您无需提交大量期货即可使用它:
ddf = ddf.map_partitions(..)
fut = client.compute(ddf)
progress(fut)
# wait on fut, call fut.result() or continue
另外不要忘记:您正在使用的分布式调度程序,即使只在一台机器上,也带有一个包含相同信息的诊断仪表板。通常这是在 http://localhost:8787
,您可以从任何浏览器访问。
我正在使用分布式调度程序和分布式进度条。
有没有办法让进度条工作 Dataframe.map_partition 或延迟?我认为缺乏期货是导致酒吧无法运作的原因。如果我将我的代码更改为 client.submit
,进度条会起作用。
- 代码如下所示:
import dask.dataframe as dd
from distributed import Client
from distributed.diagnostics.progressbar import progress
client = Client("tcp://....")
...
ddf = dd.read_parquet("...")
ddf = ddf.map_partitions(..)
progress(ddf) # no futures to pass
dask.compute(ddf)
- 替代 dask.delayed 也不起作用:
delayed = [dask.deplayed(myfunc)(ddf.get_partition(i)) for i in range(ddf.npartitions)]
progress(delayed)
dask.compute(*delayed)
- Client.submit 确实生成了一个工作进度条,但代码执行失败,我还没有设法调试它。
futures = [client.submit(myfunc, ddf.get_partition(i)) for i in range(ddf.npartitions)]
progress(futures)
dask.compute(*futures)
Is there a way to get the progress bar (or a report of tasks completed vs total) working for map_partitions or dask.delayed ?
- 带有延迟的完整代码示例:
import dask
import npumpy as np
import pandas as pd
import dask.dataframe as dd
from distributed import Client
from distributed.diagnostics.progressbar import progress
import time
cl = Client("tcp://10.0.2.15:8786")
def wait(df):
print("Received chunk")
time.sleep(2)
print("finish")
df = pd.DataFrame(np.random.randint(0, 100, size=(1000, 4)), columns=list('ABCD'))
ddf = dd.from_pandas(df, npartitions=4)
futures = [dask.delayed(wait)(ddf.get_partition(i)) for i in range(ddf.npartitions)]
progress(futures)
dask.compute(*futures)
是的,你是对的,progress
旨在处理期货或包含期货的集合。不过,您无需提交大量期货即可使用它:
ddf = ddf.map_partitions(..)
fut = client.compute(ddf)
progress(fut)
# wait on fut, call fut.result() or continue
另外不要忘记:您正在使用的分布式调度程序,即使只在一台机器上,也带有一个包含相同信息的诊断仪表板。通常这是在 http://localhost:8787
,您可以从任何浏览器访问。