如何通过另一个脚本调用 运行 Dask Client?

How to run Dask Client via call from another script?

我有一个在 Luigi 中完成的处理,在其中一个阶段中,我在 DataFrame 中执行一系列计算。为了加快速度,我决定使用本地 Dask 集群。当我通过 Python 或 Jupyter 运行 时,集群启动并且我 运行 一切正常,但是当它 运行 在 Luigi 内部时,它会出现以下错误:

UserWarning: Failed to start diagnostic server on port 8787.

df = func(params)
df.to_csv('...')

def func(params):
  df = params.get('df')
  client = Client()
  result = [client.submit(sample, row) for index, row in df.iterrows()]
  result = client.gather(result)
  new_df = pd.DataFrame(result)
  return df

如何解决?

这是未经测试的代码(没有使用过 luigi
下面的代码(作为单独的模块)怎么样 -

from dask.distributed import Client  
df = func(params)
df.to_csv('...')

def func(params):
  df = params.get('df')
  result = [client.submit(sample, row) for index, row in df.iterrows()]
  result = client.gather(result)
  new_df = pd.DataFrame(result)
  return df 

if __name__ == "__main__":  
    with Client() as client:  
        df_result = func(params)