JSON serialized object gives error with multiprocessing calls - TypeError: XXX objects not callable error
JSON serialized object gives error with multiprocessing calls - TypeError: XXX objects not callable error
我正在使用 JSON 序列化程序辅助函数来轻松访问字典(基本上以 JSON 形式接收)对象。
jsondict.py
"""Utilities for working with JSON and json-like structures - deeply nested Python dicts and lists
This lets us iterate over child nodes and access elements with a dot-notation.
"""
import sys
isPy3 = sys.version_info[0]==3
if isPy3:
def __alt_str__(v,enc='utf8'):
return v if isinstance(v,bytes) else v.encode(enc)
__strTypes__ = (str,bytes)
else:
__alt_str__ = unicode
__strTypes__ = (str,unicode)
class MyLocals(object):
pass
mylocals = MyLocals()
def setErrorCollect(collect):
mylocals.error_collect = collect
setErrorCollect(False)
def errorValue(x):
if isinstance(x,__strTypes__):
return repr(x) if ' ' in x else x
return 'None' if x is None else str(x)
def condJSON(v,__name__=''):
return JSONDict(v,__name__=__name__) if isinstance(v,dict) else JSONList(v,__name__=__name__) if isinstance(v,list) else v
def condJSONSafe(v,__name__=''):
return JSONDictSafe(v,__name__=__name__) if isinstance(v,dict) else JSONListSafe(v,__name__=__name__) if isinstance(v,list) else v
class JSONListIter(object):
def __init__(self, lst, conv):
self.lst = lst
self.i = -1
self.conv = conv
def __iter__(self):
return self
def next(self):
if self.i<len(self.lst)-1:
self.i += 1
return self.conv(self.lst[self.i])
else:
raise StopIteration
if isPy3:
__next__ = next
del next
class JSONList(list):
def __init__(self,v,__name__=''):
list.__init__(self,v)
self.__name__ = __name__
def __getitem__(self,x):
return condJSON(list.__getitem__(self,x),__name__='%s\t%s'%(self.__name__,errorValue(x)))
def __iter__(self):
return JSONListIter(self,condJSON)
class JSONListSafe(JSONList):
def __getitem__(self,x):
__name__='%s\t%s'%(self.__name__,errorValue(x))
try:
return condJSONSafe(list.__getitem__(self,x),__name__=__name__)
except:
if mylocals.error_collect:
mylocals.error_collect(__name__)
return JSONStrSafe('')
def __iter__(self):
return JSONListIter(self,condJSONSafe)
class JSONStrSafe(str):
def __getattr__(self, attr):
return self
__getitem__ = __getattr__
class JSONDict(dict):
"Allows dotted access"
def __new__(cls,*args,**kwds):
__name__ = kwds.pop('__name__')
self = dict.__new__(cls,*args,**kwds)
self.__name__ = __name__
return self
def __init__(self,*args,**kwds):
kwds.pop('__name__','')
dict.__init__(self,*args,**kwds)
def __getattr__(self, attr, default=None):
if attr in self:
return condJSON(self[attr],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
elif __alt_str__(attr) in self:
return condJSON(self[__alt_str__(attr)],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
elif attr=='__safe__':
return JSONDictSafe(self,__name__=self.__name__)
else:
raise AttributeError("No attribute or key named '%s'" % attr)
def sorted_items(self,accept=None, reject=lambda i: i[0]=='__name__'):
if accept or reject:
if not accept:
f = lambda i: not reject(i)
elif not reject:
f = accept
else: #both
f = lambda i: accept(i) and not reject(i)
return sorted(((k,condJSON(v,__name__==k)) for k,v in self.iteritems() if f((k,v))))
else:
return sorted(((k,condJSON(v,__name__==k)) for k,v in self.iteritems()))
def sorted_keys(self):
return sorted(self.keys())
class JSONDictSafe(JSONDict):
"Allows dotted access"
def __getattr__(self, attr, default=None):
if attr in self:
return condJSONSafe(self[attr],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
elif __alt_str__(attr) in self:
return condJSONSafe(self[__alt_str__(attr)],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
elif attr=='__safe__':
return self
else:
return JSONStrSafe('')
def __getitem__(self,x):
__name__='%s\t%s'%(self.__name__,errorValue(x))
try:
return condJSONSafe(dict.__getitem__(self,x),__name__=__name__)
except KeyError:
if mylocals.error_collect:
mylocals.error_collect(__name__)
return JSONStrSafe('')
def sorted_items(self,accept=None, reject=lambda i: i[0]=='__name__'):
if accept or reject:
if not accept:
f = lambda i: not reject(i)
elif not reject:
f = accept
else: #both
f = lambda i: accept(i) and not reject(i)
return sorted(((k,condJSONSafe(v,__name__==k)) for k,v in self.iteritems() if f((k,v))))
else:
return sorted(((k,condJSONSafe(v,__name__==k)) for k,v in self.iteritems()))
如果 JSON 对象像下面这样传递。
data = {'name': 'john', 'age': 20, 'address': {'city':'xyz', 'country':'XZ', 'zip': 1223}}
json_obj = condJSONSafe(data)
我可以使用点符号访问数据。
print(json_obj.name) --> john
print(json_obj.address.country) --> XZ
在我在代码中实现多处理以提高性能之前,它运行良好。
我已经从 JSON 中提取了一定数量的数据(在使用上述辅助函数将其作为点符号可访问数据之后)并将其存储到单独的列表中,例如列表 a、b、c。
然后,我进入多处理线程,
with mp.Pool(processes=mp.cpu_count()) as pool:
res = pool.starmap(self.process_records, zip(self.a, self.b, self.c))
pool.join()
最终得到
TypeError: 'JSONStrSafe' object is not callable
我尝试了 答案,但它对我不起作用。感谢你的帮助。提前致谢。
编辑:
重现示例:
test.py
import jsondict
import multiprocessing as mp
import itertools
def process_records(data, metadata):
print(data.name)
print(metadata)
#code to requirment
if __name__ == '__main__':
data = {
"metadata": "test_data",
"cust_list": [
{
'name': 'john',
'age': 20,
'address': {
'city':'xyz',
'country':'XZ',
'zip': 1223
}
},
{
'name': 'michal',
'age': 25,
'address': {
'city':'abc',
'country':'CX',
'zip': 3435
}
},
{
'name': 'david',
'age': 30,
'address': {
'city':'mnl',
'country':'TD',
'zip': 6767
}
}
]
}
json_obj = jsondict.condJSONSafe(data)
print(json_obj.metadata) #will print 'test_data'
print(json_obj.cust_list[0].name) #will print 'john'
print(json_obj.cust_list[2].address.city) #will print 'mnl'
with mp.Pool(processes=mp.cpu_count()) as pool:
res = pool.starmap(process_records, zip(json_obj.cust_list, itertools.repeat(json_obj.metadata))) # --> not working
#res = pool.map(process_records, zip(json_obj.cust_list, itertools.repeat(json_obj.metadata))) --> not working
#res = [pool.apply_async(process_records, d, json_obj.metadata) for d in json_obj.cust_list] --> not working
#apply --> not working
pool.join()
输出:
test_data
john
mnl
Traceback (most recent call last):
File "c:/Users/mohanlal/Desktop/Mock/json_err/test_app.py", line 53, in <module>
res = pool.starmap(process_records, zip(json_obj.cust_list, itertools.repeat(json_obj.metadata))) # --> not working
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 268, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 608, in get
raise self._value
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 385, in _handle_tasks
put(task)
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: 'JSONStrSafe' object is not callable
尝试使用 startmap、map、apply_async、apply,所有错误均相同。
我已尝试使用上面附带的类似问题 link 中给出的解决方案。在出现此错误的地方修改如下。
import re
dunder_pattern = re.compile("__.*__")
protected_pattern = re.compile("_.*")
classJSONStrSafe(str):
def__getattr__(self, attr):
if dunder_pattern.match(attr) or protected_pattern.match(attr):
return super().__getattr__(attr)
return self
def__getstate__(self): returnself.__dict__
def__setstate__(self, d): self.__dict__.update(d)
__getitem__ = __getattr__
但问题依然存在。
正如评论中所建议的那样,我在 getattr 的所有 3 个地方都进行了更改并进行了尝试。得到如下不同的错误
Process SpawnPoolWorker-1:
Traceback (most recent call last):
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
self.run()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 345, in get
return _ForkingPickler.loads(res)
File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py", line 89, in __new__
__name__ = kwds.pop('__name__')
Process SpawnPoolWorker-2:
Process SpawnPoolWorker-4:
Traceback (most recent call last):
Traceback (most recent call last):
KeyError: '__name__'
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
self.run()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 345, in get
return _ForkingPickler.loads(res)
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
self.run()
File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py", line 89, in __new__
__name__ = kwds.pop('__name__')
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
KeyError: '__name__'
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 345, in get
return _ForkingPickler.loads(res)
File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py", line 89, in __new__
__name__ = kwds.pop('__name__')
KeyError: '__name__'
问题是你处于“困境”。请原谅双关语——您遇到了 pickle 问题。当您进行多处理时,您的工作人员 functions/methods 的参数是 pickled。通常,用于序列化和反序列化状态的默认值是可以的,但在您的情况下不是这样。参见 Pickling Class Instances。序列化和反序列化对象的默认保存和加载操作是:
def save(obj):
return (obj.__class__, obj.__dict__)
def load(cls, attributes):
obj = cls.__new__(cls)
obj.__dict__.update(attributes)
return obj
请注意,在反序列化对象时,不会调用对象的 __init__
方法,而是调用其 __new__
方法,这就是问题所在。我不得不修改 class JSONDict
的 __new__
方法,以尝试识别它是通过反序列化调用的,因此关键字参数中可能不存在 '__name__'
然后必须添加到 class 自定义 __getstate__
和 __setstate__
方法来覆盖它保存和恢复对象属性的默认方式(方法 __init__
保持不变):
class JSONDict(dict):
"Allows dotted access"
def __new__(cls,*args,**kwds):
self = dict.__new__(cls,*args,**kwds)
if kwds and '__name__' in kwds:
__name__ = kwds.pop('__name__')
self.__name__ = __name__
return self
def __init__(self,*args,**kwds):
kwds.pop('__name__','')
dict.__init__(self,*args,**kwds)
def __getstate__(self):
return self.__dict__
def __setstate__(self, d):
self.__dict__ = d
""" The other methods remain unmodified """
打印:
test_data
john
mnl
john
test_data
michal
david
test_data
test_data
更新
我绞尽脑汁想知道为什么有必要提供 __getstate__
和 __setstate__
pickle 方法,因为无论如何它们所做的应该是默认操作。如果您修改程序只是为了测试酸洗,甚至没有 运行 Pool
方法通过插入以下行:
json_obj = condJSONSafe(data)
# insert this line:
import pickle; print(pickle.dumps(json_obj)); sys.exit(0)
它打印:
Traceback (most recent call last):
File "test.py", line 205, in <module>
import pickle; print('pickle'); print(pickle.dumps(json_obj)); sys.exit(0)
TypeError: 'JSONStrSafe' object is not callable
在正确的地方添加了打印语句后,很明显问题出在classJSONDictSafe
的__getattr__
方法中。当 pickle
检查 class 是否实现方法 __getstate__
和 __setstate__
时,当没有实现时 __getattr__
最终被调用并且 returns 作为这些属性的默认值为 JSONStrSafe
实例。因此,与其像我所做的那样通过定义这些方法来提供这些属性,还可以添加一个简单的检查,如下所示:
class JSONDictSafe(JSONDict):
"Allows dotted access"
def __getattr__(self, attr, default=None):
if attr in ('__getstate__', '__setstate__'):
raise AttributeError(f'Missing attribute: {attr}')
""" rest of the method is unmodified """
我正在使用 JSON 序列化程序辅助函数来轻松访问字典(基本上以 JSON 形式接收)对象。
jsondict.py
"""Utilities for working with JSON and json-like structures - deeply nested Python dicts and lists
This lets us iterate over child nodes and access elements with a dot-notation.
"""
import sys
isPy3 = sys.version_info[0]==3
if isPy3:
def __alt_str__(v,enc='utf8'):
return v if isinstance(v,bytes) else v.encode(enc)
__strTypes__ = (str,bytes)
else:
__alt_str__ = unicode
__strTypes__ = (str,unicode)
class MyLocals(object):
pass
mylocals = MyLocals()
def setErrorCollect(collect):
mylocals.error_collect = collect
setErrorCollect(False)
def errorValue(x):
if isinstance(x,__strTypes__):
return repr(x) if ' ' in x else x
return 'None' if x is None else str(x)
def condJSON(v,__name__=''):
return JSONDict(v,__name__=__name__) if isinstance(v,dict) else JSONList(v,__name__=__name__) if isinstance(v,list) else v
def condJSONSafe(v,__name__=''):
return JSONDictSafe(v,__name__=__name__) if isinstance(v,dict) else JSONListSafe(v,__name__=__name__) if isinstance(v,list) else v
class JSONListIter(object):
def __init__(self, lst, conv):
self.lst = lst
self.i = -1
self.conv = conv
def __iter__(self):
return self
def next(self):
if self.i<len(self.lst)-1:
self.i += 1
return self.conv(self.lst[self.i])
else:
raise StopIteration
if isPy3:
__next__ = next
del next
class JSONList(list):
def __init__(self,v,__name__=''):
list.__init__(self,v)
self.__name__ = __name__
def __getitem__(self,x):
return condJSON(list.__getitem__(self,x),__name__='%s\t%s'%(self.__name__,errorValue(x)))
def __iter__(self):
return JSONListIter(self,condJSON)
class JSONListSafe(JSONList):
def __getitem__(self,x):
__name__='%s\t%s'%(self.__name__,errorValue(x))
try:
return condJSONSafe(list.__getitem__(self,x),__name__=__name__)
except:
if mylocals.error_collect:
mylocals.error_collect(__name__)
return JSONStrSafe('')
def __iter__(self):
return JSONListIter(self,condJSONSafe)
class JSONStrSafe(str):
def __getattr__(self, attr):
return self
__getitem__ = __getattr__
class JSONDict(dict):
"Allows dotted access"
def __new__(cls,*args,**kwds):
__name__ = kwds.pop('__name__')
self = dict.__new__(cls,*args,**kwds)
self.__name__ = __name__
return self
def __init__(self,*args,**kwds):
kwds.pop('__name__','')
dict.__init__(self,*args,**kwds)
def __getattr__(self, attr, default=None):
if attr in self:
return condJSON(self[attr],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
elif __alt_str__(attr) in self:
return condJSON(self[__alt_str__(attr)],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
elif attr=='__safe__':
return JSONDictSafe(self,__name__=self.__name__)
else:
raise AttributeError("No attribute or key named '%s'" % attr)
def sorted_items(self,accept=None, reject=lambda i: i[0]=='__name__'):
if accept or reject:
if not accept:
f = lambda i: not reject(i)
elif not reject:
f = accept
else: #both
f = lambda i: accept(i) and not reject(i)
return sorted(((k,condJSON(v,__name__==k)) for k,v in self.iteritems() if f((k,v))))
else:
return sorted(((k,condJSON(v,__name__==k)) for k,v in self.iteritems()))
def sorted_keys(self):
return sorted(self.keys())
class JSONDictSafe(JSONDict):
"Allows dotted access"
def __getattr__(self, attr, default=None):
if attr in self:
return condJSONSafe(self[attr],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
elif __alt_str__(attr) in self:
return condJSONSafe(self[__alt_str__(attr)],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
elif attr=='__safe__':
return self
else:
return JSONStrSafe('')
def __getitem__(self,x):
__name__='%s\t%s'%(self.__name__,errorValue(x))
try:
return condJSONSafe(dict.__getitem__(self,x),__name__=__name__)
except KeyError:
if mylocals.error_collect:
mylocals.error_collect(__name__)
return JSONStrSafe('')
def sorted_items(self,accept=None, reject=lambda i: i[0]=='__name__'):
if accept or reject:
if not accept:
f = lambda i: not reject(i)
elif not reject:
f = accept
else: #both
f = lambda i: accept(i) and not reject(i)
return sorted(((k,condJSONSafe(v,__name__==k)) for k,v in self.iteritems() if f((k,v))))
else:
return sorted(((k,condJSONSafe(v,__name__==k)) for k,v in self.iteritems()))
如果 JSON 对象像下面这样传递。
data = {'name': 'john', 'age': 20, 'address': {'city':'xyz', 'country':'XZ', 'zip': 1223}}
json_obj = condJSONSafe(data)
我可以使用点符号访问数据。
print(json_obj.name) --> john
print(json_obj.address.country) --> XZ
在我在代码中实现多处理以提高性能之前,它运行良好。
我已经从 JSON 中提取了一定数量的数据(在使用上述辅助函数将其作为点符号可访问数据之后)并将其存储到单独的列表中,例如列表 a、b、c。
然后,我进入多处理线程,
with mp.Pool(processes=mp.cpu_count()) as pool:
res = pool.starmap(self.process_records, zip(self.a, self.b, self.c))
pool.join()
最终得到
TypeError: 'JSONStrSafe' object is not callable
我尝试了
编辑: 重现示例:
test.py
import jsondict
import multiprocessing as mp
import itertools
def process_records(data, metadata):
print(data.name)
print(metadata)
#code to requirment
if __name__ == '__main__':
data = {
"metadata": "test_data",
"cust_list": [
{
'name': 'john',
'age': 20,
'address': {
'city':'xyz',
'country':'XZ',
'zip': 1223
}
},
{
'name': 'michal',
'age': 25,
'address': {
'city':'abc',
'country':'CX',
'zip': 3435
}
},
{
'name': 'david',
'age': 30,
'address': {
'city':'mnl',
'country':'TD',
'zip': 6767
}
}
]
}
json_obj = jsondict.condJSONSafe(data)
print(json_obj.metadata) #will print 'test_data'
print(json_obj.cust_list[0].name) #will print 'john'
print(json_obj.cust_list[2].address.city) #will print 'mnl'
with mp.Pool(processes=mp.cpu_count()) as pool:
res = pool.starmap(process_records, zip(json_obj.cust_list, itertools.repeat(json_obj.metadata))) # --> not working
#res = pool.map(process_records, zip(json_obj.cust_list, itertools.repeat(json_obj.metadata))) --> not working
#res = [pool.apply_async(process_records, d, json_obj.metadata) for d in json_obj.cust_list] --> not working
#apply --> not working
pool.join()
输出:
test_data
john
mnl
Traceback (most recent call last):
File "c:/Users/mohanlal/Desktop/Mock/json_err/test_app.py", line 53, in <module>
res = pool.starmap(process_records, zip(json_obj.cust_list, itertools.repeat(json_obj.metadata))) # --> not working
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 268, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 608, in get
raise self._value
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 385, in _handle_tasks
put(task)
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: 'JSONStrSafe' object is not callable
尝试使用 startmap、map、apply_async、apply,所有错误均相同。
我已尝试使用上面附带的类似问题 link 中给出的解决方案。在出现此错误的地方修改如下。
import re
dunder_pattern = re.compile("__.*__")
protected_pattern = re.compile("_.*")
classJSONStrSafe(str):
def__getattr__(self, attr):
if dunder_pattern.match(attr) or protected_pattern.match(attr):
return super().__getattr__(attr)
return self
def__getstate__(self): returnself.__dict__
def__setstate__(self, d): self.__dict__.update(d)
__getitem__ = __getattr__
但问题依然存在。
正如评论中所建议的那样,我在 getattr 的所有 3 个地方都进行了更改并进行了尝试。得到如下不同的错误
Process SpawnPoolWorker-1:
Traceback (most recent call last):
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
self.run()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 345, in get
return _ForkingPickler.loads(res)
File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py", line 89, in __new__
__name__ = kwds.pop('__name__')
Process SpawnPoolWorker-2:
Process SpawnPoolWorker-4:
Traceback (most recent call last):
Traceback (most recent call last):
KeyError: '__name__'
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
self.run()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 345, in get
return _ForkingPickler.loads(res)
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
self.run()
File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py", line 89, in __new__
__name__ = kwds.pop('__name__')
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
KeyError: '__name__'
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 345, in get
return _ForkingPickler.loads(res)
File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py", line 89, in __new__
__name__ = kwds.pop('__name__')
KeyError: '__name__'
问题是你处于“困境”。请原谅双关语——您遇到了 pickle 问题。当您进行多处理时,您的工作人员 functions/methods 的参数是 pickled。通常,用于序列化和反序列化状态的默认值是可以的,但在您的情况下不是这样。参见 Pickling Class Instances。序列化和反序列化对象的默认保存和加载操作是:
def save(obj):
return (obj.__class__, obj.__dict__)
def load(cls, attributes):
obj = cls.__new__(cls)
obj.__dict__.update(attributes)
return obj
请注意,在反序列化对象时,不会调用对象的 __init__
方法,而是调用其 __new__
方法,这就是问题所在。我不得不修改 class JSONDict
的 __new__
方法,以尝试识别它是通过反序列化调用的,因此关键字参数中可能不存在 '__name__'
然后必须添加到 class 自定义 __getstate__
和 __setstate__
方法来覆盖它保存和恢复对象属性的默认方式(方法 __init__
保持不变):
class JSONDict(dict):
"Allows dotted access"
def __new__(cls,*args,**kwds):
self = dict.__new__(cls,*args,**kwds)
if kwds and '__name__' in kwds:
__name__ = kwds.pop('__name__')
self.__name__ = __name__
return self
def __init__(self,*args,**kwds):
kwds.pop('__name__','')
dict.__init__(self,*args,**kwds)
def __getstate__(self):
return self.__dict__
def __setstate__(self, d):
self.__dict__ = d
""" The other methods remain unmodified """
打印:
test_data
john
mnl
john
test_data
michal
david
test_data
test_data
更新
我绞尽脑汁想知道为什么有必要提供 __getstate__
和 __setstate__
pickle 方法,因为无论如何它们所做的应该是默认操作。如果您修改程序只是为了测试酸洗,甚至没有 运行 Pool
方法通过插入以下行:
json_obj = condJSONSafe(data)
# insert this line:
import pickle; print(pickle.dumps(json_obj)); sys.exit(0)
它打印:
Traceback (most recent call last):
File "test.py", line 205, in <module>
import pickle; print('pickle'); print(pickle.dumps(json_obj)); sys.exit(0)
TypeError: 'JSONStrSafe' object is not callable
在正确的地方添加了打印语句后,很明显问题出在classJSONDictSafe
的__getattr__
方法中。当 pickle
检查 class 是否实现方法 __getstate__
和 __setstate__
时,当没有实现时 __getattr__
最终被调用并且 returns 作为这些属性的默认值为 JSONStrSafe
实例。因此,与其像我所做的那样通过定义这些方法来提供这些属性,还可以添加一个简单的检查,如下所示:
class JSONDictSafe(JSONDict):
"Allows dotted access"
def __getattr__(self, attr, default=None):
if attr in ('__getstate__', '__setstate__'):
raise AttributeError(f'Missing attribute: {attr}')
""" rest of the method is unmodified """