Dask 分布式执行计算而不返回数据
Dask distributed perform computations without returning data
我有一个动态的 Dask Kubernetes 集群。
我想从 Gcloud 存储中加载 35 个镶木地板文件(约 1.2GB)到 Dask Dataframe,然后用 apply()
处理它,然后将结果保存到镶木地板文件到 Gcloud。
在从 Gcloud 存储加载文件期间,集群内存使用量增加到大约 3-4GB。然后工作人员(每个工作人员有 2GB 的 RAM)是 terminated/restarted 并且一些任务丢失了,
所以集群开始循环计算相同的东西。
我删除了 apply()
操作,只留下 read_parquet()
进行测试
如果我的自定义代码引起了麻烦,但问题是一样的,即使只有一个 read_parquet()
操作。这是一个代码:
client = Client('<ip>:8786')
client.restart()
def command():
client = get_client()
df = dd.read_parquet('gcs://<bucket>/files/name_*.parquet', storage_options={'token':'cloud'}, engine='fastparquet')
df = df.compute()
x = client.submit(command)
x.result()
注意:我正在向 运行 所有必要的命令提交一个命令函数,以避免集群内的 gcsfs 身份验证出现问题
经过一些调查,我了解到问题可能出在 .compute()
中,其中 return 将所有数据发送到一个进程,但是这个进程(我的命令函数)正在 运行ning一位工人。因此,工作人员没有足够的 RAM,崩溃并丢失所有触发任务重新计算的任务运行。
我的目标是:
- 从 parquet 文件中读取
- 使用
apply()
执行一些计算
- 并且甚至没有 return 将集群中的数据以 parquet 格式写回 Gcloud 存储。
所以,我只想将数据保存在集群上,而不是 return 返回。只需在其他地方计算和保存数据。
阅读 Dask 分布式文档后,我找到了 client.persist()/compute()
和 .scatter()
方法。它们看起来像我需要的,但我真的不知道如何使用它们。
你能帮我用 client.persist()
和 client.compute()
方法来举例吗
或者建议另一种方法来做到这一点?非常感谢!
达斯克版本:0.19.1
Dask分布式版本:1.23.1
Python版本:3.5.1
df = dd.read_parquet('gcs://<bucket>/files/name_*.parquet', storage_options={'token':'cloud'}, engine='fastparquet')
df = df.compute() # this triggers computations, but brings all of the data to one machine and creates a Pandas dataframe
df = df.persist() # this triggers computations, but keeps all of the data in multiple pandas dataframes spread across multiple machines
我有一个动态的 Dask Kubernetes 集群。
我想从 Gcloud 存储中加载 35 个镶木地板文件(约 1.2GB)到 Dask Dataframe,然后用 apply()
处理它,然后将结果保存到镶木地板文件到 Gcloud。
在从 Gcloud 存储加载文件期间,集群内存使用量增加到大约 3-4GB。然后工作人员(每个工作人员有 2GB 的 RAM)是 terminated/restarted 并且一些任务丢失了,
所以集群开始循环计算相同的东西。
我删除了 apply()
操作,只留下 read_parquet()
进行测试
如果我的自定义代码引起了麻烦,但问题是一样的,即使只有一个 read_parquet()
操作。这是一个代码:
client = Client('<ip>:8786')
client.restart()
def command():
client = get_client()
df = dd.read_parquet('gcs://<bucket>/files/name_*.parquet', storage_options={'token':'cloud'}, engine='fastparquet')
df = df.compute()
x = client.submit(command)
x.result()
注意:我正在向 运行 所有必要的命令提交一个命令函数,以避免集群内的 gcsfs 身份验证出现问题
经过一些调查,我了解到问题可能出在 .compute()
中,其中 return 将所有数据发送到一个进程,但是这个进程(我的命令函数)正在 运行ning一位工人。因此,工作人员没有足够的 RAM,崩溃并丢失所有触发任务重新计算的任务运行。
我的目标是:
- 从 parquet 文件中读取
- 使用
apply()
执行一些计算
- 并且甚至没有 return 将集群中的数据以 parquet 格式写回 Gcloud 存储。
所以,我只想将数据保存在集群上,而不是 return 返回。只需在其他地方计算和保存数据。
阅读 Dask 分布式文档后,我找到了 client.persist()/compute()
和 .scatter()
方法。它们看起来像我需要的,但我真的不知道如何使用它们。
你能帮我用 client.persist()
和 client.compute()
方法来举例吗
或者建议另一种方法来做到这一点?非常感谢!
达斯克版本:0.19.1
Dask分布式版本:1.23.1
Python版本:3.5.1
df = dd.read_parquet('gcs://<bucket>/files/name_*.parquet', storage_options={'token':'cloud'}, engine='fastparquet')
df = df.compute() # this triggers computations, but brings all of the data to one machine and creates a Pandas dataframe
df = df.persist() # this triggers computations, but keeps all of the data in multiple pandas dataframes spread across multiple machines