更改默认的多处理 unpickler class

Change default multiprocessing unpickler class

我在设备 A 上有一个多处理程序,它使用队列和 SyncManager 使其可以通过网络访问。队列存储来自设备上模块的自定义 class,它被多处理包自动腌制为 module.class。

在另一台通过 SyncManager 读取队列的设备上,我有相同的模块作为包的一部分,而不是设备 A 上的顶级模块。这意味着我在尝试读取项目时收到 ModuleNotFoundError来自队列,因为 unpickler 不知道模块现在是 package.module.

我见过这个变通方法,它使用基于 pickler.Unpicker 的新 class 并且看起来最不老套和可扩展: 但是,我不知道如何指定要使用的多处理 unpickler class。

我看到这可以用于 reducer class 所以我假设有一种方法也可以设置 unpickler?

我从来没有见过这样做的方法。你可能不得不绕过这个。让多处理器系统认为您正在传递字节字符串或字节数组,并让您的用户代码执行 pickling 和 unpickling。

破解?是的。但并不比您已经要做的更糟。

混合使用:

我能够使用类似于以下的代码使它正常工作:

from multiprocessing.reduction import ForkingPickler, AbstractReducer
import pickle
import io

multiprocessing.context._default_context.reducer = MyPickleReducer()

class RenameUnpickler(pickle.Unpickler):
    def find_class(self, module, name):
        renamed_module = module
        if module == "old_module_name":
            renamed_module = "new_package.module_name"
        return super(RenameUnpickler, self).find_class(renamed_module, name)

class MyForkingPickler(ForkingPickler):

    # Method signature from pickle._loads       
    def loads(self, /, *, fix_imports=True, encoding="ASCII", errors="strict",
            buffers=None):
        if isinstance(s, str):
            raise TypeError("Can't load pickle from unicode string")
        file = io.BytesIO(s)
        return RenameUnpickler(file, fix_imports=fix_imports, buffers=buffers,
                        encoding=encoding, errors=errors).load()
    
class MyPickleReducer(AbstractReducer):
    ForkingPickler = MyForkingPickler
    register = MyForkingPickler.register

如果您想进一步覆盖如何执行 unpickling,这可能很有用,但在我原来的情况下,使用以下方法重定向模块可能更容易:

from new_package import module_name
sys.modules['old_module_name'] = module_name