当函数依赖于大数组时,Dask map_partitions 失败
Dask map_partitions fails when function depends on large array
我正在尝试将 map_partitions
与隐式依赖于大对象的函数一起使用。代码看起来像这样:
big_array = np.array(...)
def do_something(partition):
# some operations involving partition and big_array
dask_dataframe.map_partitions(do_something)
然后我收到以下错误:
Traceback (most recent call last):
File "/Users/wiebuschm/.pyenv/versions/3.8.5/lib/python3.8/site-packages/distributed/protocol/core.py", line 72, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/Users/wiebuschm/.pyenv/versions/3.8.5/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 258, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 258, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 196, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large
distributed.comm.utils - ERROR - bytes object is too large
由于 big_array
需要以某种方式将其发送给所有工作人员,我愿意相信我们在此过程中会遇到一些大字节对象。但上限是多少?我怎样才能增加它?
Since big_array somehow needs to be shipped to all the workers I'm willing to believe that we encounter some large bytes objects along the way.
这是您的线索 - 不要像这样定义大型函数。如果你不能让他们加载数组,你应该使用 scatter
将你的数组移动到工作人员。
def do_something(partition, big_array):
# some operations involving partition and big_array
array_on_workers = client.scatter(big_array)
dask_dataframe.map_partitions(do_something, array_on_workers)
我正在尝试将 map_partitions
与隐式依赖于大对象的函数一起使用。代码看起来像这样:
big_array = np.array(...)
def do_something(partition):
# some operations involving partition and big_array
dask_dataframe.map_partitions(do_something)
然后我收到以下错误:
Traceback (most recent call last):
File "/Users/wiebuschm/.pyenv/versions/3.8.5/lib/python3.8/site-packages/distributed/protocol/core.py", line 72, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/Users/wiebuschm/.pyenv/versions/3.8.5/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 258, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 258, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 196, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large
distributed.comm.utils - ERROR - bytes object is too large
由于 big_array
需要以某种方式将其发送给所有工作人员,我愿意相信我们在此过程中会遇到一些大字节对象。但上限是多少?我怎样才能增加它?
Since big_array somehow needs to be shipped to all the workers I'm willing to believe that we encounter some large bytes objects along the way.
这是您的线索 - 不要像这样定义大型函数。如果你不能让他们加载数组,你应该使用 scatter
将你的数组移动到工作人员。
def do_something(partition, big_array):
# some operations involving partition and big_array
array_on_workers = client.scatter(big_array)
dask_dataframe.map_partitions(do_something, array_on_workers)