如何使自定义对象可用于传递给 dask df.apply 的函数(无法序列化)

How to make custom object available for function passed to dask df.apply (cannot serialize)

所有这些代码都可以在 pandas 中运行,但是 运行 单线程速度很慢。

我有一个创建速度很慢的对象(它是布隆过滤器)。

我的代码看起来像这样:

def has_match(row, my_filter):
    return my_filter.matches(
        a=row.a, b =row.b
    )

# ....make dask dataframe ddf

ddf['match'] = ddf.apply(has_match, args=(my_filter, ), axis=1, meta=(bool))
ddf.compute()

当我尝试 运行 这个时,我收到一个错误,开始:

distributed.protocol.core - CRITICAL - Failed to Serialize

我的对象是从 C 库创建的,所以我对它不能自动序列化并不感到惊讶,但我不知道如何解决这个问题。

仅使用线程

一种方法是完全避免该问题,根本不使用单独的进程。这样你就不需要在它们之间序列化数据。

ddf.compute(scheduler='threads')

虽然这确实限制了您在一台机器上的单个进程中 运行,但这可能不是您想要的。

找出如何序列化您的对象

如果你能弄清楚如何将你的对象转换成字节串并返回,那么你可以在你的对象上实现 pickle 协议(比如 __getstate____setstate__ 方法,参见 Python 文档)或者您可以向 dask_serialize 和 dask_deserialize 可调度函数添加定义。有关示例,请参阅 Dask's serialization docs

每次都重新创建对象

也许很难序列化您的对象,但每个分区重新创建一次它很便宜?

def has_match(partition):
    my_filter = make_filter(...)
    return partition.apply(my_filter.matches(a=row.a, b =row.b))

ddf['match'] = ddf.map_partitions(has_match)

分布式期望所有中间结果都是可序列化的。在您的情况下,您有一个不实现 pickle 的对象。一般来说,你在这里有几个选择(恕我直言,从最好到最差):

  • 为此对象执行 pickle。请注意,使用 copyreg 模块,您可以为不在您控制范围内的 classes 添加 pickle 支持。

  • 在您的函数中手动缓存过滤器的创建。您可以使用对象或模块中的全局变量来执行此操作。请注意,下面的代码需要是导入模块的一部分,而不是交互式会话的一部分(即不在 jupyter notebook/ipython 会话中)。

例如(未经测试):

myfilter = None


def get_or_load():
    global myfilter
    if myfilter is None:
        myfilter = load_filter()
    else:
        return myfilter


def load_filter():
    pass


def has_match(row):
    my_filter = get_or_load()
    return my_filter.matches(a=row.a, b=row.b)

然后在您的用户代码中:

from my_filter_utils import has_match

ddf['match'] = ddf.apply(has_match, axis=1, meta=('matches', bool))
  • 使用dask 管理缓存。为此,将对象包装在另一个 class 中,以便在序列化时重新加载对象。如果你随后将该对象持久化到集群中,dask 将保留它并且最多将在每个节点上调用一次创建函数。

例如(未经测试):

from dask import delayed

class Wrapper(object):
    def __init__(self, func):
        self.func = func
        self.filter = func()

    def __reduce__(self):
        # When unpickled, the filter will be reloaded
        return (Wrapper, (func,))


def load_filter():
    pass


# Create a delayed function to load the filter
wrapper = delayed(Wrapper)(load_filter)

# Optionally persist the wrapper in the cluster, to be reused over multiple computations
wrapper = wrapper.persist()

def has_match(row, wrapper):
    return wrapper.filter.matches(a=row.a, b=row.b)


ddf['match'] = ddf.apply(has_match, args=(wrapper,), axis=1, meta=('matches', bool))