CSV 日期解析的 Dask 性能慢?
Slow Dask performance on CSV date parsing?
我一直在对一大堆文件进行大量文本处理,包括大型 CSV 文件和很多很多 XML 小文件。有时我会做汇总计数,但很多时候我会做 NLP 类型的工作,以更深入地了解这些文件中除了标记的或已经结构化的内容之外的内容。
我经常使用多处理库来跨多个 CPU 执行这些计算,但我已经爱上了 Dask 背后的想法,并且在网上和同事都强烈推荐它.
我在这里问了一个关于 Dask 性能的类似问题:
和 MRocklin (https://whosebug.com/users/616616/mrocklin) 让我知道加载大量小文件可能会破坏性能。
然而,当我 运行 它处理单个大文件 (200mb) 时,我仍然无法很好地执行它。这是一个例子:
我有一个 900,000 行的推文 CSV 文件,我想快速加载它并解析 "created_at" 字段。这是我完成的三种方法以及每种方法的基准。我 运行 在配备 16GB 内存的全新 i7 2016 MacBook Pro 上使用此功能。
import pandas
import dask.dataframe as dd
import multiprocessing
%%time
# Single Threaded, no chunking
d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", parse_dates = ["created_at"])
print(len(d))
CPU 次:用户 2 分钟 31 秒,系统:807 毫秒,总计:2 分钟 32 秒
挂墙时间:2分32秒
%%time
# Multithreaded chunking
def parse_frame_dates(frame):
frame["created_at"] = pandas.to_datetime(frame["created_at"])
return(frame)
d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", chunksize = 100000)
frames = multiprocessing.Pool().imap_unordered(get_count, d)
td = pandas.concat(frames)
print(len(td))
CPU 次:用户 5.65 秒,系统:1.47 秒,总计:7.12 秒
挂墙时间:1分10秒
%%time
# Dask Load
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv",
parse_dates = ["created_at"], blocksize = 10000000).compute()
CPU 次:用户 2 分钟 59 秒,系统:26.2 秒,总计:3 分钟 25 秒
挂墙时间:3分12秒
我在许多不同的 Dask 比较中发现了这类结果,但即使让它正常工作也可能为我指明正确的方向。
简而言之,我怎样才能让 Dask 为这类任务发挥最佳性能?为什么它的表现似乎不如其他方式的单线程和多线程技术?
我怀疑 Pandas read_csv 日期时间解析代码是 pure-python,因此使用线程不会有太大好处,而线程正是 dask.dataframe 使用的默认。
使用进程时您可能会看到更好的性能。
我怀疑以下方法会更快:
import dask.multiprocessing
dask.set_options(get=dask.multiprocessing.get) # set processes as default
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv",
parse_dates = ["created_at"], blocksize = 10000000)
len(d)
进程的问题是 inter-process 通信可能变得昂贵。我在上面明确计算 len(d)
而不是 d.compute()
以避免必须提取工作进程中的所有 pandas 数据帧并将它们移动到主调用进程。在实践中,这很常见,因为人们很少想要完整的数据帧,而是对数据帧进行一些计算。
这里的相关文档是http://dask.readthedocs.io/en/latest/scheduler-choice.html
您可能还想在单台计算机上使用 distributed scheduler 而不是使用多进程调度程序。上面引用的文档中也对此进行了描述。
$ pip install dask distributed
from dask.distributed import Client
c = Client() # create processes and set as default
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv",
parse_dates = ["created_at"], blocksize = 10000000)
len(d)
我一直在对一大堆文件进行大量文本处理,包括大型 CSV 文件和很多很多 XML 小文件。有时我会做汇总计数,但很多时候我会做 NLP 类型的工作,以更深入地了解这些文件中除了标记的或已经结构化的内容之外的内容。
我经常使用多处理库来跨多个 CPU 执行这些计算,但我已经爱上了 Dask 背后的想法,并且在网上和同事都强烈推荐它.
我在这里问了一个关于 Dask 性能的类似问题:
和 MRocklin (https://whosebug.com/users/616616/mrocklin) 让我知道加载大量小文件可能会破坏性能。
然而,当我 运行 它处理单个大文件 (200mb) 时,我仍然无法很好地执行它。这是一个例子:
我有一个 900,000 行的推文 CSV 文件,我想快速加载它并解析 "created_at" 字段。这是我完成的三种方法以及每种方法的基准。我 运行 在配备 16GB 内存的全新 i7 2016 MacBook Pro 上使用此功能。
import pandas
import dask.dataframe as dd
import multiprocessing
%%time
# Single Threaded, no chunking
d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", parse_dates = ["created_at"])
print(len(d))
CPU 次:用户 2 分钟 31 秒,系统:807 毫秒,总计:2 分钟 32 秒 挂墙时间:2分32秒
%%time
# Multithreaded chunking
def parse_frame_dates(frame):
frame["created_at"] = pandas.to_datetime(frame["created_at"])
return(frame)
d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", chunksize = 100000)
frames = multiprocessing.Pool().imap_unordered(get_count, d)
td = pandas.concat(frames)
print(len(td))
CPU 次:用户 5.65 秒,系统:1.47 秒,总计:7.12 秒 挂墙时间:1分10秒
%%time
# Dask Load
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv",
parse_dates = ["created_at"], blocksize = 10000000).compute()
CPU 次:用户 2 分钟 59 秒,系统:26.2 秒,总计:3 分钟 25 秒 挂墙时间:3分12秒
我在许多不同的 Dask 比较中发现了这类结果,但即使让它正常工作也可能为我指明正确的方向。
简而言之,我怎样才能让 Dask 为这类任务发挥最佳性能?为什么它的表现似乎不如其他方式的单线程和多线程技术?
我怀疑 Pandas read_csv 日期时间解析代码是 pure-python,因此使用线程不会有太大好处,而线程正是 dask.dataframe 使用的默认。
使用进程时您可能会看到更好的性能。
我怀疑以下方法会更快:
import dask.multiprocessing
dask.set_options(get=dask.multiprocessing.get) # set processes as default
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv",
parse_dates = ["created_at"], blocksize = 10000000)
len(d)
进程的问题是 inter-process 通信可能变得昂贵。我在上面明确计算 len(d)
而不是 d.compute()
以避免必须提取工作进程中的所有 pandas 数据帧并将它们移动到主调用进程。在实践中,这很常见,因为人们很少想要完整的数据帧,而是对数据帧进行一些计算。
这里的相关文档是http://dask.readthedocs.io/en/latest/scheduler-choice.html
您可能还想在单台计算机上使用 distributed scheduler 而不是使用多进程调度程序。上面引用的文档中也对此进行了描述。
$ pip install dask distributed
from dask.distributed import Client
c = Client() # create processes and set as default
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv",
parse_dates = ["created_at"], blocksize = 10000000)
len(d)