python struct.error: 'i' format requires -2147483648 <= number <= 2147483647

python struct.error: 'i' format requires -2147483648 <= number <= 2147483647

问题

我愿意使用多处理模块 (multiprocessing.Pool.starmap() 进行特征工程。 但是,它给出如下错误消息。我猜这个错误消息是关于输入的大小 (2147483647 = 2^31 − 1?),因为相同的代码对于一小部分 (frac=0.05) 输入数据帧(train_scala,测试,ts)运行顺利.我将数据框的类型转换为尽可能小,但它并没有变得更好。

anaconda版本为4.3.30,Python版本为3.6(64位)。 并且系统的内存大小超过128GB,超过20个核心。 您想提出任何建议或解决方案来克服这个问题吗?如果此问题是由多处理模块的大数据引起的,我应该使用多少小数据来利用 Python3 上的多处理模块?

代码:

from multiprocessing import Pool, cpu_count
from itertools import repeat    
p = Pool(8)
is_train_seq = [True]*len(historyCutoffs)+[False]
config_zip = zip(historyCutoffs, repeat(train_scala), repeat(test), repeat(ts), ul_parts_path, repeat(members), is_train_seq)
p.starmap(multiprocess_FE, config_zip)

错误信息:

Traceback (most recent call last):
  File "main_1210_FE_scala_multiprocessing.py", line 705, in <module>
    print('----Pool starmap start----')
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 274, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
    put(task)
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
    header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647

额外信息

额外代码:方法multiprocess_FE

def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train):
    train_dict = {}
    ts_dict = {}
    msno_dict = {}
    ul_dict = {}
    if is_train == True:
        train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff]
    else:
        train_dict[historyCutoff] = test
    msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno)
    print('length of msno is {:d} in cutoff {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff))
    ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) & (ts.msno.isin(msno_dict[historyCutoff]))]
    print('length of transaction is {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff))    
    ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt"))  ##.sample(frac=0.01, replace=False)
    ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])]
    train_dict[historyCutoff] = enrich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)

进程间的通信协议使用pickling,pickled数据以pickled数据大小为前缀。对于您的方法,所有参数一起 被腌制为一个对象。

您生成的对象在 pickled 后大于 i 结构格式化程序(四字节带符号整数),这打破了代码所做的假设。

您可以将数据帧的读取委托给子进程,只发送加载数据帧所需的元数据。它们的总大小接近 1GB,在您的进程之间通过管道共享的数据太多了。

引自Programming guidelines section

Better to inherit than pickle/unpickle

When using the spawn or forkserver start methods many types from multiprocessing need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.

如果您不在 Windows 上 运行 并使用 spawnforkserver 方法,您可以在 之前将数据帧加载为全局变量 启动您的子进程,此时子进程将通过正常的 OS 写时复制内存页面共享机制 'inherit' 数据。

请注意,在 Python 3.8 中,此限制已针对非 Windows 系统提高到 unsigned long long(8 字节),因此您现在可以发送和接收 4 EiB of data. See this commit, and Python issues #35152 and #17560.

如果你不能升级,不能使用资源继承,并且不在运行上Windows,那么使用这个补丁:

import functools
import logging
import struct
import sys

logger = logging.getLogger()


def patch_mp_connection_bpo_17560():
    """Apply PR-10305 / bpo-17560 connection send/receive max size update

    See the original issue at https://bugs.python.org/issue17560 and 
    https://github.com/python/cpython/pull/10305 for the pull request.

    This only supports Python versions 3.3 - 3.7, this function
    does nothing for Python versions outside of that range.

    """
    patchname = "Multiprocessing connection patch for bpo-17560"
    if not (3, 3) < sys.version_info < (3, 8):
        logger.info(
            patchname + " not applied, not an applicable Python version: %s",
            sys.version
        )
        return

    from multiprocessing.connection import Connection

    orig_send_bytes = Connection._send_bytes
    orig_recv_bytes = Connection._recv_bytes
    if (
        orig_send_bytes.__code__.co_filename == __file__
        and orig_recv_bytes.__code__.co_filename == __file__
    ):
        logger.info(patchname + " already applied, skipping")
        return

    @functools.wraps(orig_send_bytes)
    def send_bytes(self, buf):
        n = len(buf)
        if n > 0x7fffffff:
            pre_header = struct.pack("!i", -1)
            header = struct.pack("!Q", n)
            self._send(pre_header)
            self._send(header)
            self._send(buf)
        else:
            orig_send_bytes(self, buf)

    @functools.wraps(orig_recv_bytes)
    def recv_bytes(self, maxsize=None):
        buf = self._recv(4)
        size, = struct.unpack("!i", buf.getvalue())
        if size == -1:
            buf = self._recv(8)
            size, = struct.unpack("!Q", buf.getvalue())
        if maxsize is not None and size > maxsize:
            return None
        return self._recv(size)

    Connection._send_bytes = send_bytes
    Connection._recv_bytes = recv_bytes

    logger.info(patchname + " applied")

此问题已在最近 python 的 PR 中修复 https://github.com/python/cpython/pull/10305

如果需要,您可以在本地进行此更改以使其立即为您工作,而无需等待 python 和 anaconda 版本。