运行 dask map_partition 在多个 worker 中运行

Running dask map_partition functions in multiple workers

我有一个使用五个 docker 容器实现的 dask 架构:一个客户端、一个调度程序和三个工作程序。我还有一个大型 dask 数据框,它以 parquet 格式存储在 docker 卷中。 dataframe 是用 3 个分区创建的,所以有 3 个文件(每个分区一个文件)。

我需要 运行 数据帧上的一个函数 map_partitions,其中每个工作人员将处理一个分区。

我的尝试:

def my_function(dfx):
    return dfx['abc'] = dfx['def'] + 1

df = dd.read_parquet(... path to parquet file)

client = Client('127.0.0.1:8786')

with joblib.parallel_backend('dask'):
      df = df.map_partitions(my_function) 
      

这是正确的做法吗?如何告诉 dask 在 with 语句中使用 client 变量,以便函数 运行 在工人身上?我需要 df.compute() 才能开始执行吗?

注意:删除 'with' 语句,如果 dask 客户端在 Jupyter 上是 运行,这就可以正常工作。问题是当 Dask 客户端在 Docker 上 运行 时,因为 Dask 在 Web 应用程序而不是 Docker 容器中创建工作人员。

更新

docker 撰写文件:

version: '3'

services:   

  web:
    image: img-python-01
    container_name: cont_flask
    volumes:
      - c:/visualcode-py:/code
      - c:/conf:/conf
      - vol_dask_data:/data
      - vol_dask_model:/model
    ports:
      - "5000:5000"
    working_dir: /code
    environment:
      - app.config=/conf/py.app.json
      - common.config=/conf/py.common.json 
      - CUDA_VISIBLE_DEVICES=''
    entrypoint:
      - gunicorn
    command:
      - -t 7200
      - -b 0.0.0.0:5000
      - --reload
      - app.frontend.app:app
      
      
  scheduler:
    image: img-python-01
    container_name: cont_scheduler
    ports:
      - "8787:8787"
      - "8786:8786"
    entrypoint:
      - dask-scheduler

  worker:
    image: img-python-01
    depends_on:
      - scheduler
    environment:
      - app.config=/conf/py.app.json
      - common.config=/conf/py.common.json 
      - PYTHONPATH=/code
      - MODEL_PATH=/model/rfc_model.pkl
      - PREPROCESSING_PATH=/model/data_columns.pkl
      - SCHEDULER_ADDRESS=scheduler
      - SCHEDULER_PORT=8786
      - CUDA_VISIBLE_DEVICES=''
    working_dir: /code
    volumes:
      - c:/visualcode-py:/code
      - c:/conf:/conf
      - c:/winfiles:/winfiles
      - vol_dask_data:/data
      - vol_dask_model:/model
    entrypoint:
      - dask-worker
    command:
      - scheduler:8786
    
volumes:
  vol_dask_data:
     name: vol_dask_data
  vol_dask_model:
     name: vol_dask_model

docker-compose up -d --scale worker=4 开始,web 上的 flask/gunicorn 应用程序 运行。

注意:当我 运行 a client.submit(),工人 运行 在容器上时,此配置工作正常。

更新 2

这是适用于当前 docker 撰写文件的代码:

futures1 = client.submit(process_loans, exec_id, 1, dataset, w1)
futures2 = client.submit(process_loans, exec_id, 2, dataset, w2)

worker_responses = client.gather([futures1, futures2])

我在 Dask 仪表板中看到函数 process_loans 在工作容器上 运行ning

python 片段似乎没有有效地使用 dask API。可能是你的实际功能比较复杂,所以map_partitions免不了,但是还是先看简单的情况吧:

def my_function(dfx):
    # return dfx['abc'] = dfx['def'] + 1
    # the above returns the result of assignment
    # we need to separate the assignment and return statements
    dfx['abc'] = dfx['def'] + 1
    return dfx

df = dd.read_parquet(... path to parquet file)

client = Client('127.0.0.1:8786')

with joblib.parallel_backend('dask'):
      df = df.map_partitions(my_function) 

另一种 re-write 上面(对于这个基本情况)的方法是显式分配新的列值:

df = dd.read_parquet(... path to parquet file)
df['abc'] = df['def'] + 1

或使用.assign方法:

df = (
    dd.read_parquet(path_to_parquet_file)
    .assign(abc=lambda df: df['def'] + 1)
)

其他问题方面:

  • 如果在上下文之外创建clientjoblib将使用现有客户端;

  • 为了将每个分区的计算限制为一个工作人员,最简单的方法是为每个工作人员分配一个特定的资源单元 foo 并要求每个计算使用一个 foo可用资源,请参阅 docs on resources;

  • 是否需要.compute取决于下游发生的事情。如果数据可以放入可用内存并且内存中有数据是有效的,那么应该执行 .compute。否则,将任何实际计算延迟到最后一步可能会更有效。例如,如果此代码的最终结果是将更新后的数据保存到另一组镶木地板文件中,则无需发出 .compute,因为 dask 将在 .to_parquet 时触发计算被执行。

在我看来,系统的各个部分之间存在一些混淆。

首先让我指出,给定的函数会产生语法错误。也许你的意思是

def my_function(dfx): 
    dfx['abc'] = dfx['def'] + 1
    return dfx

(这个在另一个回答里有提到)

其次,这里为什么要涉及joblib?您似乎没有在任何地方向 joblib 提交工作,它根本没有被使用。您所需要的只是临时 API 电话和您的客户。

df2 = df.map_partitions(my_function)

然后用 df2 做任何您想做的事。这还没有开始任何执行,它制作了一个要执行的操作图。

如果您想将整个结果数据集解析到客户端内存中(这可能不是您想要的!),您可以这样做

out = df2.compute()

这将自动使用您的分布式调度程序。 你也可以更明确

f = client.compute(df2)

which returns a future 你可以等待(f.result()distributed 中的其他功能)或允许继续背景。