Dask 分布式执行计算而不返回数据

Dask distributed perform computations without returning data

我有一个动态的 Dask Kubernetes 集群。 我想从 Gcloud 存储中加载 35 个镶木地板文件(约 1.2GB)到 Dask Dataframe,然后用 apply() 处理它,然后将结果保存到镶木地板文件到 Gcloud。

在从 Gcloud 存储加载文件期间,集群内存使用量增加到大约 3-4GB。然后工作人员(每个工作人员有 2GB 的 RAM)是 terminated/restarted 并且一些任务丢失了, 所以集群开始循环计算相同的东西。 我删除了 apply() 操作,只留下 read_parquet() 进行测试 如果我的自定义代码引起了麻烦,但问题是一样的,即使只有一个 read_parquet() 操作。这是一个代码:

client = Client('<ip>:8786')
client.restart()

def command():
    client = get_client()
    df = dd.read_parquet('gcs://<bucket>/files/name_*.parquet', storage_options={'token':'cloud'}, engine='fastparquet')
    df = df.compute()

x = client.submit(command)
x.result()

注意:我正在向 运行 所有必要的命令提交一个命令函数,以避免集群内的 gcsfs 身份验证出现问题

经过一些调查,我了解到问题可能出在 .compute() 中,其中 return 将所有数据发送到一个进程,但是这个进程(我的命令函数)正在 运行ning一位工人。因此,工作人员没有足够的 RAM,崩溃并丢失所有触发任务重新计算的任务运行。

我的目标是:

所以,我只想将数据保存在集群上,而不是 return 返回。只需在其他地方计算和保存数据。

阅读 Dask 分布式文档后,我找到了 client.persist()/compute().scatter() 方法。它们看起来像我需要的,但我真的不知道如何使用它们。

你能帮我用 client.persist()client.compute() 方法来举例吗 或者建议另一种方法来做到这一点?非常感谢!

达斯克版本:0.19.1

Dask分布式版本:1.23.1

Python版本:3.5.1

df = dd.read_parquet('gcs://<bucket>/files/name_*.parquet', storage_options={'token':'cloud'}, engine='fastparquet')
df = df.compute()  # this triggers computations, but brings all of the data to one machine and creates a Pandas dataframe

df = df.persist()  # this triggers computations, but keeps all of the data in multiple pandas dataframes spread across multiple machines