使用 pyarrow 在工作人员之间共享对象
Sharing objects across workers using pyarrow
我想向 multiprocessing.Pool.map()
创建的多个工作进程提供对共享 DataFrame 的只读访问权限。
我想避免复制和酸洗。
我知道 pyarrow 可以用于此目的。但是,我发现他们的文档非常繁琐。任何人都可以提供有关如何完成的示例吗?
https://github.com/apache/arrow/blob/master/python/examples/plasma/sorting/sort_df.py 处的示例是一个工作示例,它使用 Python 多处理在多个工作人员之间共享一个 Pandas 数据帧(请注意,它需要您构建一个小型 Cython 库,以便到运行它)。
数据框通过Arrow's Plasma object store共享。
如果您不依赖 Python 多处理,您可以使用 Ray 以更简单的语法执行您想要的操作。
要授予多个工作人员对 Pandas 数据框的只读访问权限,您可以执行以下操作。
import numpy as np
import pandas
import ray
ray.init()
df = pandas.DataFrame(np.random.normal(size=(1000, 10)))
@ray.remote
def f(df):
# This task will run on a worker and have read only access to the
# dataframe. For example, "df.iloc[0][0] = 1" will raise an exception.
try:
df.iloc[0][0] = 1
except ValueError:
pass
return df.iloc[0][0]
# Serialize the dataframe with pyarrow and store it in shared memory.
df_id = ray.put(df)
# Run four tasks that have access to the dataframe.
result_ids = [f.remote(df_id) for _ in range(4)]
# Get the results.
results = ray.get(result_ids)
注意df_id = ray.put(df)
这一行可以省略(也可以直接调用f.remote(df)
)。在那种情况下,df
仍然会存储在共享内存中并与worker共享,但是会存储4次(每次调用f.remote(df)
一次),效率较低。
我想向 multiprocessing.Pool.map()
创建的多个工作进程提供对共享 DataFrame 的只读访问权限。
我想避免复制和酸洗。
我知道 pyarrow 可以用于此目的。但是,我发现他们的文档非常繁琐。任何人都可以提供有关如何完成的示例吗?
https://github.com/apache/arrow/blob/master/python/examples/plasma/sorting/sort_df.py 处的示例是一个工作示例,它使用 Python 多处理在多个工作人员之间共享一个 Pandas 数据帧(请注意,它需要您构建一个小型 Cython 库,以便到运行它)。
数据框通过Arrow's Plasma object store共享。
如果您不依赖 Python 多处理,您可以使用 Ray 以更简单的语法执行您想要的操作。
要授予多个工作人员对 Pandas 数据框的只读访问权限,您可以执行以下操作。
import numpy as np
import pandas
import ray
ray.init()
df = pandas.DataFrame(np.random.normal(size=(1000, 10)))
@ray.remote
def f(df):
# This task will run on a worker and have read only access to the
# dataframe. For example, "df.iloc[0][0] = 1" will raise an exception.
try:
df.iloc[0][0] = 1
except ValueError:
pass
return df.iloc[0][0]
# Serialize the dataframe with pyarrow and store it in shared memory.
df_id = ray.put(df)
# Run four tasks that have access to the dataframe.
result_ids = [f.remote(df_id) for _ in range(4)]
# Get the results.
results = ray.get(result_ids)
注意df_id = ray.put(df)
这一行可以省略(也可以直接调用f.remote(df)
)。在那种情况下,df
仍然会存储在共享内存中并与worker共享,但是会存储4次(每次调用f.remote(df)
一次),效率较低。