Error using pexpect and multiprocessing? error "TypError: cannot serialize '_io.TextIOWrapper' object"

Error using pexpect and multiprocessing? error "TypError: cannot serialize '_io.TextIOWrapper' object"

我在 Linux 机器上有一个 Python 3.7 脚本,我试图在其中 运行 多线程中的一个函数,但是当我尝试时,我收到以下错误:

Traceback (most recent call last):
  File "./test2.py", line 43, in <module>
    pt.ping_scanx()
  File "./test2.py", line 39, in ping_scanx
    par = Parallel(function=self.pingx, parameter_list=list, thread_limit=10)
  File "./test2.py", line 19, in __init__
    self._x = self._pool.starmap(function, parameter_list, chunksize=1)
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 276, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 657, in get
    raise self._value
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 431, in _handle_tasks
    put(task)
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/local/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object

这是我用来演示问题的示例代码:

#!/usr/local/bin/python3.7
from multiprocessing import Pool
import pexpect   # Used to run SSH for sessions

class Parallel:

    def __init__(self, function, parameter_list, thread_limit=4):

        # Create new thread to hold our jobs
        self._pool = Pool(processes=thread_limit)

        self._x = self._pool.starmap(function, parameter_list, chunksize=1)

class PingTest():

    def __init__(self):
        self._pex = None

    def connect(self):
        self._pex = pexpect.spawn("ssh snorton@127.0.0.1")

    def pingx(self, target_ip, source_ip):
        print("PING {} {}".format(target_ip, source_ip))

    def ping_scanx(self):

        self.connect()

        list = [['8.8.8.8', '96.53.16.93'],
                ['8.8.8.8', '96.53.16.93']]

        par = Parallel(function=self.pingx, parameter_list=list, thread_limit=10)


pt = PingTest()
pt.ping_scanx()

如果我不包含带有 pexpect.spawn 的行,则不会发生错误。有人可以解释为什么我会收到错误,并提出解决方法吗?

使用 multiprocessing.Pool 您实际上是将函数作为单独的进程而不是线程来调用。进程不能共享 Python 对象,除非它们在通过进程间通信通道相互传输之前先被序列化,这就是 multiprocessing.Pool 在幕后使用 pickle 作为序列化程序为您所做的.由于 pexpect.spawn 打开终端设备作为类似文件的 TextIOWrapper 对象,并且您将返回的对象存储在 PingTest 实例中,然后将绑定方法 self.pingx 传递给Pool.starmap,它会尝试序列化self,其中包含_pex属性中的pexpect.spawn对象,不幸的是无法序列化,因为TextIOWrapper不支持序列化。

由于您的函数是 I/O-bound,您应该通过 multiprocessing.dummy 模块使用线程来实现更高效的并行化,更重要的是,在这种情况下,允许 pexpect.spawn 对象成为跨线程共享,无需序列化。

变化:

from multiprocessing import Pool

至:

from multiprocessing.dummy import Pool

演示:https://repl.it/@blhsing/WiseYoungExperiments