如何使自定义对象可用于传递给 dask df.apply 的函数(无法序列化)
How to make custom object available for function passed to dask df.apply (cannot serialize)
所有这些代码都可以在 pandas 中运行,但是 运行 单线程速度很慢。
我有一个创建速度很慢的对象(它是布隆过滤器)。
我的代码看起来像这样:
def has_match(row, my_filter):
return my_filter.matches(
a=row.a, b =row.b
)
# ....make dask dataframe ddf
ddf['match'] = ddf.apply(has_match, args=(my_filter, ), axis=1, meta=(bool))
ddf.compute()
当我尝试 运行 这个时,我收到一个错误,开始:
distributed.protocol.core - CRITICAL - Failed to Serialize
我的对象是从 C 库创建的,所以我对它不能自动序列化并不感到惊讶,但我不知道如何解决这个问题。
仅使用线程
一种方法是完全避免该问题,根本不使用单独的进程。这样你就不需要在它们之间序列化数据。
ddf.compute(scheduler='threads')
虽然这确实限制了您在一台机器上的单个进程中 运行,但这可能不是您想要的。
找出如何序列化您的对象
如果你能弄清楚如何将你的对象转换成字节串并返回,那么你可以在你的对象上实现 pickle 协议(比如 __getstate__
和 __setstate__
方法,参见 Python 文档)或者您可以向 dask_serialize 和 dask_deserialize 可调度函数添加定义。有关示例,请参阅 Dask's serialization docs。
每次都重新创建对象
也许很难序列化您的对象,但每个分区重新创建一次它很便宜?
def has_match(partition):
my_filter = make_filter(...)
return partition.apply(my_filter.matches(a=row.a, b =row.b))
ddf['match'] = ddf.map_partitions(has_match)
分布式期望所有中间结果都是可序列化的。在您的情况下,您有一个不实现 pickle 的对象。一般来说,你在这里有几个选择(恕我直言,从最好到最差):
为此对象执行 pickle。请注意,使用 copyreg 模块,您可以为不在您控制范围内的 classes 添加 pickle 支持。
在您的函数中手动缓存过滤器的创建。您可以使用对象或模块中的全局变量来执行此操作。请注意,下面的代码需要是导入模块的一部分,而不是交互式会话的一部分(即不在 jupyter notebook/ipython 会话中)。
例如(未经测试):
myfilter = None
def get_or_load():
global myfilter
if myfilter is None:
myfilter = load_filter()
else:
return myfilter
def load_filter():
pass
def has_match(row):
my_filter = get_or_load()
return my_filter.matches(a=row.a, b=row.b)
然后在您的用户代码中:
from my_filter_utils import has_match
ddf['match'] = ddf.apply(has_match, axis=1, meta=('matches', bool))
- 使用dask 管理缓存。为此,将对象包装在另一个 class 中,以便在序列化时重新加载对象。如果你随后将该对象持久化到集群中,dask 将保留它并且最多将在每个节点上调用一次创建函数。
例如(未经测试):
from dask import delayed
class Wrapper(object):
def __init__(self, func):
self.func = func
self.filter = func()
def __reduce__(self):
# When unpickled, the filter will be reloaded
return (Wrapper, (func,))
def load_filter():
pass
# Create a delayed function to load the filter
wrapper = delayed(Wrapper)(load_filter)
# Optionally persist the wrapper in the cluster, to be reused over multiple computations
wrapper = wrapper.persist()
def has_match(row, wrapper):
return wrapper.filter.matches(a=row.a, b=row.b)
ddf['match'] = ddf.apply(has_match, args=(wrapper,), axis=1, meta=('matches', bool))
所有这些代码都可以在 pandas 中运行,但是 运行 单线程速度很慢。
我有一个创建速度很慢的对象(它是布隆过滤器)。
我的代码看起来像这样:
def has_match(row, my_filter):
return my_filter.matches(
a=row.a, b =row.b
)
# ....make dask dataframe ddf
ddf['match'] = ddf.apply(has_match, args=(my_filter, ), axis=1, meta=(bool))
ddf.compute()
当我尝试 运行 这个时,我收到一个错误,开始:
distributed.protocol.core - CRITICAL - Failed to Serialize
我的对象是从 C 库创建的,所以我对它不能自动序列化并不感到惊讶,但我不知道如何解决这个问题。
仅使用线程
一种方法是完全避免该问题,根本不使用单独的进程。这样你就不需要在它们之间序列化数据。
ddf.compute(scheduler='threads')
虽然这确实限制了您在一台机器上的单个进程中 运行,但这可能不是您想要的。
找出如何序列化您的对象
如果你能弄清楚如何将你的对象转换成字节串并返回,那么你可以在你的对象上实现 pickle 协议(比如 __getstate__
和 __setstate__
方法,参见 Python 文档)或者您可以向 dask_serialize 和 dask_deserialize 可调度函数添加定义。有关示例,请参阅 Dask's serialization docs。
每次都重新创建对象
也许很难序列化您的对象,但每个分区重新创建一次它很便宜?
def has_match(partition):
my_filter = make_filter(...)
return partition.apply(my_filter.matches(a=row.a, b =row.b))
ddf['match'] = ddf.map_partitions(has_match)
分布式期望所有中间结果都是可序列化的。在您的情况下,您有一个不实现 pickle 的对象。一般来说,你在这里有几个选择(恕我直言,从最好到最差):
为此对象执行 pickle。请注意,使用 copyreg 模块,您可以为不在您控制范围内的 classes 添加 pickle 支持。
在您的函数中手动缓存过滤器的创建。您可以使用对象或模块中的全局变量来执行此操作。请注意,下面的代码需要是导入模块的一部分,而不是交互式会话的一部分(即不在 jupyter notebook/ipython 会话中)。
例如(未经测试):
myfilter = None
def get_or_load():
global myfilter
if myfilter is None:
myfilter = load_filter()
else:
return myfilter
def load_filter():
pass
def has_match(row):
my_filter = get_or_load()
return my_filter.matches(a=row.a, b=row.b)
然后在您的用户代码中:
from my_filter_utils import has_match
ddf['match'] = ddf.apply(has_match, axis=1, meta=('matches', bool))
- 使用dask 管理缓存。为此,将对象包装在另一个 class 中,以便在序列化时重新加载对象。如果你随后将该对象持久化到集群中,dask 将保留它并且最多将在每个节点上调用一次创建函数。
例如(未经测试):
from dask import delayed
class Wrapper(object):
def __init__(self, func):
self.func = func
self.filter = func()
def __reduce__(self):
# When unpickled, the filter will be reloaded
return (Wrapper, (func,))
def load_filter():
pass
# Create a delayed function to load the filter
wrapper = delayed(Wrapper)(load_filter)
# Optionally persist the wrapper in the cluster, to be reused over multiple computations
wrapper = wrapper.persist()
def has_match(row, wrapper):
return wrapper.filter.matches(a=row.a, b=row.b)
ddf['match'] = ddf.apply(has_match, args=(wrapper,), axis=1, meta=('matches', bool))