Dask DataFrame Coiled KilledWorker read_sql

Dask DataFrame Coiled KilledWorker read_sql

我正在尝试 运行 Dask 集群与 Dash 应用程序一起分析非常大的数据集。我能够成功 运行 a LocalCluster 并且 Dask DataFrame 计算成功。 Dash 应用程序使用以下 gunicorn 命令启动:

不幸的是,当我尝试将集群移动到 coiled 时出现了问题。

coiled.create_software_environment(
    name="my-conda-env",
    conda={
        "channels": ["conda-forge", "defaults"],
        "dependencies": ["dask", "dash"],
    },
)

coiled.create_cluster_configuration(
    name="my-cluster-config",
    scheduler_cpu=1,
    scheduler_memory="1 GiB",
    worker_cpu=2,
    worker_memory="1 GiB",
    software="my-conda-env"
)

cluster = coiled.Cluster(n_workers=2)
CLIENT = Client(cluster)

dd_bills_df = dd.read_sql_table(
    table, conn_string, npartitions=10, index_col='DB_BillID'
)
CLIENT.publish_dataset(bills=dd_bills_df)
del dd_bills_df

log.debug(CLIENT.list_datasets())

x = CLIENT.get_dataset('bills').persist()
log.debug(x.groupby('BillType').count().compute())

集群创建成功,数据集成功发布到集群,数据集成功被客户端拉入变量x。在 groupby() 计算期间出现问题。

[2021-12-03 17:40:30 -0600] [78928] [ERROR] Exception in worker process
Traceback (most recent call last):
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/arbiter.py", line 589, in spawn_worker
    worker.init_process()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/workers/base.py", line 134, in init_process
    self.load_wsgi()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/workers/base.py", line 146, in load_wsgi
    self.wsgi = self.app.wsgi()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/base.py", line 67, in wsgi
    self.callable = self.load()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/wsgiapp.py", line 58, in load
    return self.load_wsgiapp()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/wsgiapp.py", line 48, in load_wsgiapp
    return util.import_app(self.app_uri)
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/util.py", line 359, in import_app
    mod = importlib.import_module(module)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
  File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 855, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/Users/leowotzak/PenHole/test-containers2/src/application.py", line 61, in <module>
    log.debug(x.groupby('BillType').count().compute())
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/dask/base.py", line 288, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/dask/base.py", line 571, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 2725, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 1980, in gather
    return self.sync(
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 868, in sync
    return sync(
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/utils.py", line 332, in sync
    raise exc.with_traceback(tb)
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/utils.py", line 315, in f
    result[0] = yield future
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 1845, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('_read_sql_chunk-5519d13b-b80d-468e-afd5-5f072b9adbec', <WorkerState 'tls://10.4.27.61:40673', name: coiled-dask-leowotzc2-75566-worker-6a0538671d, status: closed, memory: 0, processing: 10>)

这是崩溃前的日志输出:

DEBUG:application:Dask DataFrame Structure:
               BillName BillType ByRequest Congress EnactedAs    IntroducedAt BillNumber OfficialTitle PopularTitle ShortTitle CurrentStatus BillSubjectTopTerm     URL TextURL  DB_LastModDate  DB_CreatedDate
npartitions=10                                                                                                                                                                                                 
1.0              object   object     int64   object    object  datetime64[ns]     object        object       object     object        object             object  object  object  datetime64[ns]  datetime64[ns]
2739.9              ...      ...       ...      ...       ...             ...        ...           ...          ...        ...           ...                ...     ...     ...             ...             ...
...                 ...      ...       ...      ...       ...             ...        ...           ...          ...        ...           ...                ...     ...     ...             ...             ...
24651.1             ...      ...       ...      ...       ...             ...        ...           ...          ...        ...           ...                ...     ...     ...             ...             ...
27390.0             ...      ...       ...      ...       ...             ...        ...           ...          ...        ...           ...                ...     ...     ...             ...             ...
Dask Name: from-delayed, 20 tasks
DEBUG:application:('bills',)

我曾尝试增加分配给每个工作人员的内存和 Dask DataFrame 中的分区数量,但无济于事。我正在努力弄清楚是什么杀死了工人,还有其他人 运行 遇到这个错误吗?

如果数据集非常大,为工作程序和调度程序设置 1GB 可能会非常受限。有两种选择可供尝试:

  1. 将 worker 和调度程序的内存设置为与本地计算机相当的级别。

  2. 在 table.

    的相当小的子集上尝试 coiled 版本的代码

当对结果进行大分组操作时,您可以尝试以下操作:

  • 如果有任何列是分类的,请务必设置 observed=True,以便只有出现在每个组中的类别才会出现在结果中。这是 pandas groupby 操作的一个怪癖,但确实会在 dask.dataframes 中破坏结果。见相关:
  • Consider using split_out=True in your aggregation call if supported, e.g. df.groupby(large_set).mean(split_out=True). By default, the result of groupby operations will return a single partition - split out is significantly slower but won't blow up your memory. See related:
  • 如果可能,请考虑使用 df.map_partitions
  • 减少每个分区中的数据大小作为预处理步骤

解决方案

错误的来源是错误配置的 dask-workerdask-scheduler 软件环境,与 coiled 和原始 post 中的代码示例无关。

dask-schedulerdask-worker 进程是 运行 在 EC2 实例上的 docker 容器中。为了初始化这些进程,使用了以下命令:

sudo docker run -it --net=host daskdev/dask:latest dask-worker <host>:<port>

daskdev/dask 在文档中这样定义:

This a normal debian + miniconda image with the full Dask conda package (including the distributed scheduler), Numpy, and Pandas. This image is about 1GB in size.

问题是,dask.dataframe.read_sql_table(...) utilizes sqlalchemy,并扩展为数据库驱动器,例如 pymysql。这些不包含在此基础映像中。为了解决这个问题,之前的 docker run 命令可以修改为:

sudo docker run -it -e EXTRA_PIP_PACKAGES="sqlalchemy pymysql" --net=host daskdev/dask:latest dask-worker <host>:<port>