dask - 在超过 RAM 的大型数据帧上应用函数
dask - Applying a function over a large dataframe which is more than RAM
相信 Dask 框架能够处理大小超过 RAM 的数据集。然而,我无法成功地将它应用到我的问题中,这听起来像这样:
我有一个巨大的 .csv 文件 (1.8Gb),其中包含用户评论的文本,以及 8Gb 的 RAM。目标是预处理给定的数据(首先对句子进行标记)。为了实现这个,我运行下面的代码:
if __name__ == '__main__':
client = Client(n_workers=3, memory_limit='1.5GB', processes=True)
df = dd.read_csv('texts_no_n', dtype={'user_id': int, 'post_id': int, 'text': str})
print('Tokenizing sents')
def tokenize(df):
df['text'] = df.text.apply(lambda post: nltk.sent_tokenize(post, language='russian'))
print('tokenized')
return df
df = df.map_partitions(tokenize, meta=df)
df.compute()
Dask 将我的数据帧拆分为 20 个分区。
我希望 Dask 工作人员对每个分区进行迭代:
- 标记文本(运行
tokenize(df_part)
)并 return 一个新的,
给定数据帧的预处理部分
- 释放内存,用于从文件读取分区。在执行 'compute' 方法后总是如此
在遍历所有分区后,我希望 Dask 连接所有预处理分区和 return 一个完整的预处理数据帧。
这种行为对我来说似乎合乎逻辑并且最节省内存,尽管实践表明 Dask 在处理整个数据帧之前不会释放内存。
计算完 20 个分区中的 12 个分区后,我 运行 内存不足,Dask 似乎正试图将工作人员的数据转储到磁盘。看看输出:
Tokenizing sents
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 1.05 GB -- Worker memory limit: 1.50 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 1.05 GB -- Worker memory limit: 1.50 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 979.51 MB -- Worker memory limit: 1.50 GB
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
由于内存泄漏,调度程序重新启动了所有工作程序。大量 RAM 释放,token 化过程重新开始(发生在图中 RAM 急剧下降的时候)
我想,当工人重新启动时,他们会从头开始工作,否则我的数据预处理最终会完成。因此,重启workers不符合我的需求。
在 运行 同一个进程多次运行后,调度程序终止工作进程,代码终止。
我的问题是:
1) 是否可以使用 Dask 或任何其他工具对大数据进行多处理预处理?
我本可以使用 pandas' 数据框管理这个 1.8Gb 的数据集,仅使用一个过程,但出于教育目的,我问:如果我的数据集超过我的 RAM 怎么办?比如说,10Gb。
2)为什么Dask的worker不能将他们为每个分区计算的数据转储到磁盘以释放RAM?
输出显示工作人员没有数据可存储,但事实并非如此,因为我的 RAM 中充满了数据。如果一个分区大小大约是 60 Mb(就像我的情况一样),Dask 不能只转储一些分区吗?
还有一点需要思考的是:
考虑 3 名工人的情况。如果每个 worker 处理的数据量大致相同,那么对于我的 1.8Gb 来说,一个进程使用的最大内存量应该等于大约
1) 1.8Gb / 3 * 2 = 1.2Gb
,想要的是:2) 1.8Gb / 3 = 600Mb
在第一种情况下,我将结果乘以 2,假设 df = df.map_partitions(tokenize, meta=df)
花费的数据量等于给定数据量加上已处理数据量(在我的情况下大致相同) .数据消耗的第二个公式是我想要的技术之一,如上所述(我希望 Dask 工作的方式)。
问题是我没有这么大的 RAM 来容纳第一个公式所用的数据。
Dask 会如您所愿。它加载一大块数据,处理它,然后如果它可以释放它。但是,您可能 运行 会遇到一些问题:
你正在调用 df.compute
这意味着你要求 Dask return 你的整个数据集作为一个内存中的 pandas 数据帧。相反,您可能想尝试 df.to_parquet(...)
之类的操作,以便 Dask 知道您实际上想将结果写入磁盘或某些其他聚合,以便您的输出适合内存。
Dask 将 运行 您的许多任务同时并行,因此它会同时加载许多块。
您可能想在此处查看 Dask 的最佳实践:https://docs.dask.org/en/latest/best-practices.html
终于可以回答我自己的问题了。
如实践(和文档)所示,处理 dask 的最佳方法是将其与 .parquet 格式一起使用。起初,我用 df.to_parquet(dir_name)
将我的大文件分成许多 .parquet 文件,然后用 dd.read_parquet(dir_name)
加载它们并应用我的函数。
以下代码对我有用:
def preprocess_df(df): # To pass to 'map_partition'
mystem = Mystem() # Most important to set it here! Don't pass objects as an argument
df['text'] = df['text'].apply(lambda x: pr.preprocess_post(x, mystem))
mystem.close()
return df
if __name__ == '__main__':
client = Client(n_workers=4)
# Splitting the big file
df = dd.read_csv('texts.csv', dtype={'user_id': int, 'post_id': int, 'text': str}) # Read a big data file
df = df.repartition(npartitions=df.npartitions*8) # 8 migh be too high, try with lower values at first (e.g., 2 or don't repartition at all)
df.to_parquet(dir_name) # convert .csv file to .parquet parts
# Loading the splitted file parts
df = dd.read_parquet(dir_name)
# Applying the function
df = df.map_partitions(preprocess_df, meta={'user_id': int, 'post_id': int, 'text': object}) # Be sure not to '.compute' here
df.to_parquet('preprocesed.parquet')
client.close()
RAM 消耗没有超过 50%。
我猜,有助于减少 RAM 消耗的不是 .parquet 格式,而是将文件拆分成多个部分。
Update: Be careful when passing objects (mystem
) to the function (preprocess_df
), on which 'map_partition' is applied, because it might result in an unexpected behaviour (cause all the workers will try to shape this object, which is not what we want in most of cases). If you need to pass additional 'multiprocessingly-problematic' objects, define them inside the function itself (like in the 3rd line: mystem = Mystem()
).
相信 Dask 框架能够处理大小超过 RAM 的数据集。然而,我无法成功地将它应用到我的问题中,这听起来像这样:
我有一个巨大的 .csv 文件 (1.8Gb),其中包含用户评论的文本,以及 8Gb 的 RAM。目标是预处理给定的数据(首先对句子进行标记)。为了实现这个,我运行下面的代码:
if __name__ == '__main__':
client = Client(n_workers=3, memory_limit='1.5GB', processes=True)
df = dd.read_csv('texts_no_n', dtype={'user_id': int, 'post_id': int, 'text': str})
print('Tokenizing sents')
def tokenize(df):
df['text'] = df.text.apply(lambda post: nltk.sent_tokenize(post, language='russian'))
print('tokenized')
return df
df = df.map_partitions(tokenize, meta=df)
df.compute()
Dask 将我的数据帧拆分为 20 个分区。
我希望 Dask 工作人员对每个分区进行迭代:
- 标记文本(运行
tokenize(df_part)
)并 return 一个新的, 给定数据帧的预处理部分 - 释放内存,用于从文件读取分区。在执行 'compute' 方法后总是如此
在遍历所有分区后,我希望 Dask 连接所有预处理分区和 return 一个完整的预处理数据帧。
这种行为对我来说似乎合乎逻辑并且最节省内存,尽管实践表明 Dask 在处理整个数据帧之前不会释放内存。
计算完 20 个分区中的 12 个分区后,我 运行 内存不足,Dask 似乎正试图将工作人员的数据转储到磁盘。看看输出:
Tokenizing sents tokenized tokenized tokenized tokenized tokenized tokenized tokenized tokenized tokenized tokenized tokenized tokenized distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 1.05 GB -- Worker memory limit: 1.50 GB distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 1.05 GB -- Worker memory limit: 1.50 GB distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 979.51 MB -- Worker memory limit: 1.50 GB distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker
由于内存泄漏,调度程序重新启动了所有工作程序。大量 RAM 释放,token 化过程重新开始(发生在图中 RAM 急剧下降的时候)
我想,当工人重新启动时,他们会从头开始工作,否则我的数据预处理最终会完成。因此,重启workers不符合我的需求。
在 运行 同一个进程多次运行后,调度程序终止工作进程,代码终止。
我的问题是:
1) 是否可以使用 Dask 或任何其他工具对大数据进行多处理预处理?
我本可以使用 pandas' 数据框管理这个 1.8Gb 的数据集,仅使用一个过程,但出于教育目的,我问:如果我的数据集超过我的 RAM 怎么办?比如说,10Gb。
2)为什么Dask的worker不能将他们为每个分区计算的数据转储到磁盘以释放RAM?
输出显示工作人员没有数据可存储,但事实并非如此,因为我的 RAM 中充满了数据。如果一个分区大小大约是 60 Mb(就像我的情况一样),Dask 不能只转储一些分区吗?
还有一点需要思考的是:
考虑 3 名工人的情况。如果每个 worker 处理的数据量大致相同,那么对于我的 1.8Gb 来说,一个进程使用的最大内存量应该等于大约
1) 1.8Gb / 3 * 2 = 1.2Gb
,想要的是:2) 1.8Gb / 3 = 600Mb
在第一种情况下,我将结果乘以 2,假设 df = df.map_partitions(tokenize, meta=df)
花费的数据量等于给定数据量加上已处理数据量(在我的情况下大致相同) .数据消耗的第二个公式是我想要的技术之一,如上所述(我希望 Dask 工作的方式)。
问题是我没有这么大的 RAM 来容纳第一个公式所用的数据。
Dask 会如您所愿。它加载一大块数据,处理它,然后如果它可以释放它。但是,您可能 运行 会遇到一些问题:
你正在调用
df.compute
这意味着你要求 Dask return 你的整个数据集作为一个内存中的 pandas 数据帧。相反,您可能想尝试df.to_parquet(...)
之类的操作,以便 Dask 知道您实际上想将结果写入磁盘或某些其他聚合,以便您的输出适合内存。Dask 将 运行 您的许多任务同时并行,因此它会同时加载许多块。
您可能想在此处查看 Dask 的最佳实践:https://docs.dask.org/en/latest/best-practices.html
终于可以回答我自己的问题了。
如实践(和文档)所示,处理 dask 的最佳方法是将其与 .parquet 格式一起使用。起初,我用 df.to_parquet(dir_name)
将我的大文件分成许多 .parquet 文件,然后用 dd.read_parquet(dir_name)
加载它们并应用我的函数。
以下代码对我有用:
def preprocess_df(df): # To pass to 'map_partition'
mystem = Mystem() # Most important to set it here! Don't pass objects as an argument
df['text'] = df['text'].apply(lambda x: pr.preprocess_post(x, mystem))
mystem.close()
return df
if __name__ == '__main__':
client = Client(n_workers=4)
# Splitting the big file
df = dd.read_csv('texts.csv', dtype={'user_id': int, 'post_id': int, 'text': str}) # Read a big data file
df = df.repartition(npartitions=df.npartitions*8) # 8 migh be too high, try with lower values at first (e.g., 2 or don't repartition at all)
df.to_parquet(dir_name) # convert .csv file to .parquet parts
# Loading the splitted file parts
df = dd.read_parquet(dir_name)
# Applying the function
df = df.map_partitions(preprocess_df, meta={'user_id': int, 'post_id': int, 'text': object}) # Be sure not to '.compute' here
df.to_parquet('preprocesed.parquet')
client.close()
RAM 消耗没有超过 50%。
我猜,有助于减少 RAM 消耗的不是 .parquet 格式,而是将文件拆分成多个部分。
Update: Be careful when passing objects (
mystem
) to the function (preprocess_df
), on which 'map_partition' is applied, because it might result in an unexpected behaviour (cause all the workers will try to shape this object, which is not what we want in most of cases). If you need to pass additional 'multiprocessingly-problematic' objects, define them inside the function itself (like in the 3rd line:mystem = Mystem()
).