dask 分布式 fastparquet 中的处理时间不一致
inconsistent processing time in dask distributed fastparquet
我有一个 hive 格式的 parquet 文件和 snappy 压缩。它适合内存,pandas.info 提供以下数据。
parquet 文件中每组的行数仅为 100K
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
Index: 21547746 entries, YyO+tlZtAXYXoZhNr3Vg3+dfVQvrBVGO8j1mfqe4ZHc= to oE4y2wK5E7OR8zyrCHeW02uTeI6wTwT4QTApEVBNEdM=
Data columns (total 8 columns):
payment_method_id int16
payment_plan_days int16
plan_list_price int16
actual_amount_paid int16
is_auto_renew bool
transaction_date datetime64[ns]
membership_expire_date datetime64[ns]
is_cancel bool
dtypes: bool(2), datetime64[ns](2), int16(4)
memory usage: 698.7+ MB
现在,用 dask 进行一些简单的计算,我得到以下时间
使用线程
>>>time.asctime();ddf.actual_amount_paid.mean().compute();time.asctime()
'Fri Oct 13 23:44:50 2017'
141.98732048354384
'Fri Oct 13 23:44:59 2017'
使用分布式(本地集群)
>>> c=Client()
>>> time.asctime();ddf.actual_amount_paid.mean().compute();time.asctime()
'Fri Oct 13 23:47:04 2017'
141.98732048354384
'Fri Oct 13 23:47:15 2017'
>>>
没关系,每个大约 9 秒。
现在使用多处理,惊喜来了...
>>> time.asctime();ddf.actual_amount_paid.mean().compute(get=dask.multiprocessing.get);time.asctime()
'Fri Oct 13 23:50:43 2017'
141.98732048354384
'Fri Oct 13 23:57:49 2017'
>>>
我希望多处理和 distributed/local 集群处于相同的数量级,可能与线程有一些差异(无论好坏)
但是,多处理花费 47 倍的时间来对 in16 列进行简单平均?
我的环境只是一个全新的 conda 安装,包含所需的模块。没有任何东西的手工挑选。
为什么会有这种差异??我无法管理 dask/distributed 具有可预测的行为,以便能够根据我的问题的性质在不同的调度程序之间明智地进行选择。
这只是一个玩具示例,但我无法找到符合我期望的示例(至少我对阅读文档的理解)。
有什么我应该牢记在心的吗?还是我完全没有抓住要点?
谢谢
JC
使用线程调度程序,每个任务都可以访问进程的所有内存 - 在这种情况下是所有数据 - 因此可以在没有任何内存复制的情况下进行计算。
使用分布式调度程序,调度程序知道哪个线程和哪个工作程序正在生成后续任务所需的数据,或者已经在内存中具有该数据。调度程序的巧妙之处在于将计算转移到合适的工作人员,以避免数据通信和复制。
相反,多进程调度程序倾向于将任务结果发送到主进程或从主进程发送任务结果,这可能涉及大量序列化和复制。有些任务可以融合在一起(通过在链中调用许多 python 函数来组合任务),但有些则不能。任何序列化和复制都需要 CPU 的努力,而且对您来说可能更重要的是记忆 space。如果您的原始数据占系统总量的很大一部分,您可能正在填满物理内存,从而导致速度下降。
我有一个 hive 格式的 parquet 文件和 snappy 压缩。它适合内存,pandas.info 提供以下数据。
parquet 文件中每组的行数仅为 100K
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
Index: 21547746 entries, YyO+tlZtAXYXoZhNr3Vg3+dfVQvrBVGO8j1mfqe4ZHc= to oE4y2wK5E7OR8zyrCHeW02uTeI6wTwT4QTApEVBNEdM=
Data columns (total 8 columns):
payment_method_id int16
payment_plan_days int16
plan_list_price int16
actual_amount_paid int16
is_auto_renew bool
transaction_date datetime64[ns]
membership_expire_date datetime64[ns]
is_cancel bool
dtypes: bool(2), datetime64[ns](2), int16(4)
memory usage: 698.7+ MB
现在,用 dask 进行一些简单的计算,我得到以下时间
使用线程
>>>time.asctime();ddf.actual_amount_paid.mean().compute();time.asctime()
'Fri Oct 13 23:44:50 2017'
141.98732048354384
'Fri Oct 13 23:44:59 2017'
使用分布式(本地集群)
>>> c=Client()
>>> time.asctime();ddf.actual_amount_paid.mean().compute();time.asctime()
'Fri Oct 13 23:47:04 2017'
141.98732048354384
'Fri Oct 13 23:47:15 2017'
>>>
没关系,每个大约 9 秒。
现在使用多处理,惊喜来了...
>>> time.asctime();ddf.actual_amount_paid.mean().compute(get=dask.multiprocessing.get);time.asctime()
'Fri Oct 13 23:50:43 2017'
141.98732048354384
'Fri Oct 13 23:57:49 2017'
>>>
我希望多处理和 distributed/local 集群处于相同的数量级,可能与线程有一些差异(无论好坏)
但是,多处理花费 47 倍的时间来对 in16 列进行简单平均?
我的环境只是一个全新的 conda 安装,包含所需的模块。没有任何东西的手工挑选。
为什么会有这种差异??我无法管理 dask/distributed 具有可预测的行为,以便能够根据我的问题的性质在不同的调度程序之间明智地进行选择。
这只是一个玩具示例,但我无法找到符合我期望的示例(至少我对阅读文档的理解)。
有什么我应该牢记在心的吗?还是我完全没有抓住要点?
谢谢
JC
使用线程调度程序,每个任务都可以访问进程的所有内存 - 在这种情况下是所有数据 - 因此可以在没有任何内存复制的情况下进行计算。
使用分布式调度程序,调度程序知道哪个线程和哪个工作程序正在生成后续任务所需的数据,或者已经在内存中具有该数据。调度程序的巧妙之处在于将计算转移到合适的工作人员,以避免数据通信和复制。
相反,多进程调度程序倾向于将任务结果发送到主进程或从主进程发送任务结果,这可能涉及大量序列化和复制。有些任务可以融合在一起(通过在链中调用许多 python 函数来组合任务),但有些则不能。任何序列化和复制都需要 CPU 的努力,而且对您来说可能更重要的是记忆 space。如果您的原始数据占系统总量的很大一部分,您可能正在填满物理内存,从而导致速度下降。