泡菜转储的多处理队列问题

multiprocessing queue issue with pickle dumps

我已经阅读并再次阅读了关于多处理模块和队列管理的 Python 文档,但我找不到与此问题相关的任何内容,这让我发疯并阻止了我的项目:

我写了一个 'JsonLike' class 它允许我创建一个对象,例如:

a = JsonLike()
a.john.doe.is.here = True

...不考虑中间初始化(很有用)

下面的代码只是创建了这样一个对象,将其设置并插入到一个数组中,然后尝试将其发送到一个进程(这是我需要的,但是 发送对象本身会导致同样的错误)

考虑这段代码:

from multiprocessing import Process, Queue, Event

class JsonLike(dict):
    """
    This class allows json-crossing-through creation and setting such as :
    a = JsonLike()
    a.john.doe.is.here = True
    it automatically creates all the hierarchy
    """

    def __init__(self, *args, **kwargs):
        # super(JsonLike, self).__init__(*args, **kwargs)
        dict.__init__(self, *args, **kwargs)
        for arg in args:
            if isinstance(arg, dict):
                for k, v in arg.items():
                    self[k] = v
        if kwargs:
            for k, v in kwargs.items():
                self[k] = v

    def __getattr__(self, attr):
        if self.get(attr) != None:
            return attr
        else:
            newj = JsonLike()
            self.__setattr__(attr, newj)
            return newj

    def __setattr__(self, key, value):
        self.__setitem__(key, value)

    def __setitem__(self, key, value):
        dict.__setitem__(self, key, value)
        self.__dict__.update({key: value})

    def __delattr__(self, item):
        self.__delitem__(item)

    def __delitem__(self, key):
        dict.__delitem__(self, key)
        del self.__dict__[key]


def readq(q, e):
    while True:
        obj = q.get()
        print('got')
        if e.is_set():
            break


if __name__ == '__main__':
    q = Queue()
    e = Event()

    obj = JsonLike()
    obj.toto = 1

    arr=[obj]

    proc = Process(target=readq, args=(q,e))
    proc.start()
    print(f"Before sending value :{arr}")
    q.put(arr)
    print('sending done')
    e.set()
    proc.join()
    proc.close()

我得到以下输出(在 q.put 上):

Before sending value :[{'toto': 1}]
Traceback (most recent call last):
sending done
  File "/usr/lib/python3.7/multiprocessing/queues.py", line 236, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: 'JsonLike' object is not callable

有什么建议吗?

问题是你在搞乱 __getattr__。如果在此方法中添加打印语句,您将看到 运行 以下代码也会导致崩溃:

obj = JsonLike()
obj.toto.test = 1

q = Queue()
q.put(obj)
q.get()

最后一条语句将导致(重复)调用 obj.__getattr__,搜索名为 __getstate__ (it will later try to find its friend __setstate__). Here's what the pickle 的属性,文档中提到了这个 dunder 方法:

If the __getstate__() method is absent, the instance’s __dict__ is pickled as usual.

在你的情况下,问题是这个方法不存在,但你的代码让它看起来像它存在(通过动态创建一个具有正确名称的属性)。因此,不会触发默认行为,而是调用名为 __getstate__ 的空属性。问题是 __getstate__ 不是可调用的,因为它是一个空的 JsonLike 对象。这就是为什么您可能会在此处看到 "JsonLike is not callable" 弹出式错误。

一个快速解决方法是避免接触看起来像 __xx__ 甚至 _xx 的属性。就此而言,您可以 add/modify 这些行:

import re

dunder_pattern = re.compile("__.*__")
protected_pattern = re.compile("_.*")

class JsonLike(dict):

    def __getattr__(self, attr):
        if dunder_pattern.match(attr) or protected_pattern.match(attr):
            return super().__getattr__(attr)
        if self.get(attr) != None:
            return attr
        else:
            newj = JsonLike()
            self.__setattr__(attr, newj)
            return newj

这将使之前的代码工作(同样适用于您的代码)。但另一方面,你将不能再写 obj.__toto__ = 1 之类的东西了,但无论如何这可能是件好事。

我觉得您可能会在其他情况下遇到类似的错误,遗憾的是,在某些情况下您会发现库不会使用此类可预测的属性名称。这就是为什么我不建议使用这种机制 IRL 的原因之一(尽管我真的很喜欢这个想法并且我很想看看它能走多远)。