当函数依赖于大数组时,Dask map_partitions 失败

Dask map_partitions fails when function depends on large array

我正在尝试将 map_partitions 与隐式依赖于大对象的函数一起使用。代码看起来像这样:

big_array = np.array(...)

def do_something(partition):
    # some operations involving partition and big_array

dask_dataframe.map_partitions(do_something)

然后我收到以下错误:

Traceback (most recent call last):
  File "/Users/wiebuschm/.pyenv/versions/3.8.5/lib/python3.8/site-packages/distributed/protocol/core.py", line 72, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/Users/wiebuschm/.pyenv/versions/3.8.5/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 258, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 258, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 196, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large
distributed.comm.utils - ERROR - bytes object is too large

由于 big_array 需要以某种方式将其发送给所有工作人员,我愿意相信我们在此过程中会遇到一些大字节对象。但上限是多少?我怎样才能增加它?

Since big_array somehow needs to be shipped to all the workers I'm willing to believe that we encounter some large bytes objects along the way.

这是您的线索 - 不要像这样定义大型函数。如果你不能让他们加载数组,你应该使用 scatter 将你的数组移动到工作人员。

def do_something(partition, big_array):
    # some operations involving partition and big_array

array_on_workers = client.scatter(big_array)
dask_dataframe.map_partitions(do_something, array_on_workers)