Dask DataFrame Coiled KilledWorker read_sql
Dask DataFrame Coiled KilledWorker read_sql
我正在尝试 运行 Dask
集群与 Dash
应用程序一起分析非常大的数据集。我能够成功 运行 a LocalCluster
并且 Dask DataFrame 计算成功。 Dash
应用程序使用以下 gunicorn
命令启动:
不幸的是,当我尝试将集群移动到 coiled
时出现了问题。
coiled.create_software_environment(
name="my-conda-env",
conda={
"channels": ["conda-forge", "defaults"],
"dependencies": ["dask", "dash"],
},
)
coiled.create_cluster_configuration(
name="my-cluster-config",
scheduler_cpu=1,
scheduler_memory="1 GiB",
worker_cpu=2,
worker_memory="1 GiB",
software="my-conda-env"
)
cluster = coiled.Cluster(n_workers=2)
CLIENT = Client(cluster)
dd_bills_df = dd.read_sql_table(
table, conn_string, npartitions=10, index_col='DB_BillID'
)
CLIENT.publish_dataset(bills=dd_bills_df)
del dd_bills_df
log.debug(CLIENT.list_datasets())
x = CLIENT.get_dataset('bills').persist()
log.debug(x.groupby('BillType').count().compute())
集群创建成功,数据集成功发布到集群,数据集成功被客户端拉入变量x
。在 groupby()
计算期间出现问题。
[2021-12-03 17:40:30 -0600] [78928] [ERROR] Exception in worker process
Traceback (most recent call last):
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/arbiter.py", line 589, in spawn_worker
worker.init_process()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/workers/base.py", line 134, in init_process
self.load_wsgi()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/workers/base.py", line 146, in load_wsgi
self.wsgi = self.app.wsgi()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/base.py", line 67, in wsgi
self.callable = self.load()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/wsgiapp.py", line 58, in load
return self.load_wsgiapp()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/wsgiapp.py", line 48, in load_wsgiapp
return util.import_app(self.app_uri)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/util.py", line 359, in import_app
mod = importlib.import_module(module)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 855, in exec_module
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File "/Users/leowotzak/PenHole/test-containers2/src/application.py", line 61, in <module>
log.debug(x.groupby('BillType').count().compute())
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/dask/base.py", line 288, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/dask/base.py", line 571, in compute
results = schedule(dsk, keys, **kwargs)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 2725, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 1980, in gather
return self.sync(
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 868, in sync
return sync(
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/utils.py", line 332, in sync
raise exc.with_traceback(tb)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/utils.py", line 315, in f
result[0] = yield future
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 1845, in _gather
raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('_read_sql_chunk-5519d13b-b80d-468e-afd5-5f072b9adbec', <WorkerState 'tls://10.4.27.61:40673', name: coiled-dask-leowotzc2-75566-worker-6a0538671d, status: closed, memory: 0, processing: 10>)
这是崩溃前的日志输出:
DEBUG:application:Dask DataFrame Structure:
BillName BillType ByRequest Congress EnactedAs IntroducedAt BillNumber OfficialTitle PopularTitle ShortTitle CurrentStatus BillSubjectTopTerm URL TextURL DB_LastModDate DB_CreatedDate
npartitions=10
1.0 object object int64 object object datetime64[ns] object object object object object object object object datetime64[ns] datetime64[ns]
2739.9 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
24651.1 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
27390.0 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: from-delayed, 20 tasks
DEBUG:application:('bills',)
我曾尝试增加分配给每个工作人员的内存和 Dask DataFrame 中的分区数量,但无济于事。我正在努力弄清楚是什么杀死了工人,还有其他人 运行 遇到这个错误吗?
如果数据集非常大,为工作程序和调度程序设置 1GB 可能会非常受限。有两种选择可供尝试:
将 worker 和调度程序的内存设置为与本地计算机相当的级别。
在 table.
的相当小的子集上尝试 coiled
版本的代码
当对结果进行大分组操作时,您可以尝试以下操作:
- 如果有任何列是分类的,请务必设置
observed=True
,以便只有出现在每个组中的类别才会出现在结果中。这是 pandas groupby 操作的一个怪癖,但确实会在 dask.dataframes 中破坏结果。见相关:
- Consider using
split_out=True
in your aggregation call if supported, e.g. df.groupby(large_set).mean(split_out=True)
. By default, the result of groupby operations will return a single partition - split out is significantly slower but won't blow up your memory. See related:
- 如果可能,请考虑使用
df.map_partitions
减少每个分区中的数据大小作为预处理步骤
解决方案
错误的来源是错误配置的 dask-worker
和 dask-scheduler
软件环境,与 coiled
和原始 post 中的代码示例无关。
dask-scheduler
和 dask-worker
进程是 运行 在 EC2 实例上的 docker
容器中。为了初始化这些进程,使用了以下命令:
sudo docker run -it --net=host daskdev/dask:latest dask-worker <host>:<port>
daskdev/dask
在文档中这样定义:
This a normal debian + miniconda image with the full Dask conda package (including the distributed scheduler), Numpy, and Pandas. This image is about 1GB in size.
问题是,dask.dataframe.read_sql_table(...)
utilizes sqlalchemy
,并扩展为数据库驱动器,例如 pymysql
。这些不包含在此基础映像中。为了解决这个问题,之前的 docker run
命令可以修改为:
sudo docker run -it -e EXTRA_PIP_PACKAGES="sqlalchemy pymysql" --net=host daskdev/dask:latest dask-worker <host>:<port>
我正在尝试 运行 Dask
集群与 Dash
应用程序一起分析非常大的数据集。我能够成功 运行 a LocalCluster
并且 Dask DataFrame 计算成功。 Dash
应用程序使用以下 gunicorn
命令启动:
不幸的是,当我尝试将集群移动到 coiled
时出现了问题。
coiled.create_software_environment(
name="my-conda-env",
conda={
"channels": ["conda-forge", "defaults"],
"dependencies": ["dask", "dash"],
},
)
coiled.create_cluster_configuration(
name="my-cluster-config",
scheduler_cpu=1,
scheduler_memory="1 GiB",
worker_cpu=2,
worker_memory="1 GiB",
software="my-conda-env"
)
cluster = coiled.Cluster(n_workers=2)
CLIENT = Client(cluster)
dd_bills_df = dd.read_sql_table(
table, conn_string, npartitions=10, index_col='DB_BillID'
)
CLIENT.publish_dataset(bills=dd_bills_df)
del dd_bills_df
log.debug(CLIENT.list_datasets())
x = CLIENT.get_dataset('bills').persist()
log.debug(x.groupby('BillType').count().compute())
集群创建成功,数据集成功发布到集群,数据集成功被客户端拉入变量x
。在 groupby()
计算期间出现问题。
[2021-12-03 17:40:30 -0600] [78928] [ERROR] Exception in worker process
Traceback (most recent call last):
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/arbiter.py", line 589, in spawn_worker
worker.init_process()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/workers/base.py", line 134, in init_process
self.load_wsgi()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/workers/base.py", line 146, in load_wsgi
self.wsgi = self.app.wsgi()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/base.py", line 67, in wsgi
self.callable = self.load()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/wsgiapp.py", line 58, in load
return self.load_wsgiapp()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/wsgiapp.py", line 48, in load_wsgiapp
return util.import_app(self.app_uri)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/util.py", line 359, in import_app
mod = importlib.import_module(module)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 855, in exec_module
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File "/Users/leowotzak/PenHole/test-containers2/src/application.py", line 61, in <module>
log.debug(x.groupby('BillType').count().compute())
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/dask/base.py", line 288, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/dask/base.py", line 571, in compute
results = schedule(dsk, keys, **kwargs)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 2725, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 1980, in gather
return self.sync(
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 868, in sync
return sync(
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/utils.py", line 332, in sync
raise exc.with_traceback(tb)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/utils.py", line 315, in f
result[0] = yield future
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 1845, in _gather
raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('_read_sql_chunk-5519d13b-b80d-468e-afd5-5f072b9adbec', <WorkerState 'tls://10.4.27.61:40673', name: coiled-dask-leowotzc2-75566-worker-6a0538671d, status: closed, memory: 0, processing: 10>)
这是崩溃前的日志输出:
DEBUG:application:Dask DataFrame Structure:
BillName BillType ByRequest Congress EnactedAs IntroducedAt BillNumber OfficialTitle PopularTitle ShortTitle CurrentStatus BillSubjectTopTerm URL TextURL DB_LastModDate DB_CreatedDate
npartitions=10
1.0 object object int64 object object datetime64[ns] object object object object object object object object datetime64[ns] datetime64[ns]
2739.9 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
24651.1 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
27390.0 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: from-delayed, 20 tasks
DEBUG:application:('bills',)
我曾尝试增加分配给每个工作人员的内存和 Dask DataFrame 中的分区数量,但无济于事。我正在努力弄清楚是什么杀死了工人,还有其他人 运行 遇到这个错误吗?
如果数据集非常大,为工作程序和调度程序设置 1GB 可能会非常受限。有两种选择可供尝试:
将 worker 和调度程序的内存设置为与本地计算机相当的级别。
在 table.
的相当小的子集上尝试coiled
版本的代码
当对结果进行大分组操作时,您可以尝试以下操作:
- 如果有任何列是分类的,请务必设置
observed=True
,以便只有出现在每个组中的类别才会出现在结果中。这是 pandas groupby 操作的一个怪癖,但确实会在 dask.dataframes 中破坏结果。见相关: - Consider using
split_out=True
in your aggregation call if supported, e.g.df.groupby(large_set).mean(split_out=True)
. By default, the result of groupby operations will return a single partition - split out is significantly slower but won't blow up your memory. See related: - 如果可能,请考虑使用
df.map_partitions
减少每个分区中的数据大小作为预处理步骤
解决方案
错误的来源是错误配置的 dask-worker
和 dask-scheduler
软件环境,与 coiled
和原始 post 中的代码示例无关。
dask-scheduler
和 dask-worker
进程是 运行 在 EC2 实例上的 docker
容器中。为了初始化这些进程,使用了以下命令:
sudo docker run -it --net=host daskdev/dask:latest dask-worker <host>:<port>
daskdev/dask
在文档中这样定义:
This a normal debian + miniconda image with the full Dask conda package (including the distributed scheduler), Numpy, and Pandas. This image is about 1GB in size.
问题是,dask.dataframe.read_sql_table(...)
utilizes sqlalchemy
,并扩展为数据库驱动器,例如 pymysql
。这些不包含在此基础映像中。为了解决这个问题,之前的 docker run
命令可以修改为:
sudo docker run -it -e EXTRA_PIP_PACKAGES="sqlalchemy pymysql" --net=host daskdev/dask:latest dask-worker <host>:<port>