Python 多线程通信效率

Python multi-thread communication efficiency

我是 python 多任务处理的新手。我正在以老式的方式进行操作:

我继承自 threading.Thread 并使用 queue.Queue 队列发送消息 to/from 主线程。

这是我的基础线程 class:

class WorkerGenerico(threading.Thread):
    def __init__(self, task_id, input_q=None, output_q=None, keep_alive=300):
        super(WorkerGenerico, self).__init__()
        self._task_id = task_id
        if input_q is None:
            self._input_q = queue.Queue()
        else:
            if isinstance(input_q, queue.Queue):
                self._input_q = input_q
            else:
                raise TypeError("input_q debe ser del tipo queue.Queue")
        if output_q is None:
            self._output_q = queue.Queue()
        else:
            if isinstance(output_q, queue.Queue):
                self._output_q = output_q
            else:
                raise TypeError("input_q debe ser del tipo queue.Queue")
        if not isinstance(keep_alive, int):
            raise TypeError("El valor de keep_alive debe der un int.")
        self._keep_alive = keep_alive
        self.stoprequest = threading.Event()

    # def run(self):
    #    Implement a loop in subclases which checks if self.has_orden_parada() is true in order to stop.

    def join(self, timeout=None):
        self.stoprequest.set()
        super(WorkerGenerico, self).join(timeout)

    def gracefull_stop(self):
        self.stoprequest.set()

    def has_orden_parada(self):
        return self.stoprequest.is_set()

    def put(self,texto, block=True, timeout=None):
        return self._input_q.put(texto, block=block, timeout=timeout)

    def get(self, block=True, timeout=None):
        return self._output_q.get(block=block, timeout=timeout)

我的问题是,与在主线程中存储队列并使用 Queue.get() 相比,从外部调用 WorkerGenerico.get() 的开销有多大。 这两种方法在性能上看起来相似,但都带有少量不频繁的控制消息,但是,我想非常频繁的调用会使方法 B 值得使用。:

我猜想模式 A 更耗资源(它必须以某种方式从外线程调用方法并将队列定义传回,我想损失取决于 Python 实现),但是,最终代码更具可读性和直观性。

如果我必须根据其他语言的经验来判断,我会说方法 B 要好得多,对吗?

方法一:

def main()
    worker = WorkerGenerico(task_id=1)
    worker.start()
    print(worker.get())

方法 B:

def main()
    input_q = Queue()
    output_q = Queue()
    worker = WorkerGenerico(task_id=1, input_q=input_q, output_q=output_q)
    worker.start()
    print(output_q.get())

顺便说一句:为了完整起见,我想分享一下我现在的做法。它混合了两种方法,为线程提供了一个很好的信封:

class EnvoltorioWorker:
    def __init__(self, task_id, input_q=None, output_q=None, keep_alive=300):
        if input_q is None:
            self._input_q = queue.Queue()
        else:
            if isinstance(input_q, queue.Queue):
                self._input_q = input_q
            else:
                raise TypeError("input_q debe ser del tipo queue.Queue")
        if output_q is None:
            self._output_q = queue.Queue()
        else:
            if isinstance(output_q, queue.Queue):
                self._output_q = output_q
            else:
                raise TypeError("input_q debe ser del tipo queue.Queue")
        self.worker = WorkerGenerico(task_id, input_q, output_q, keep_alive)

    def put(self, elem, block=True, timeout=None):
        return self._input_q.put(elem, block=block, timeout=timeout)

    def get(self, block=True, timeout=None):
        return self._output_q.get(block=block, timeout=timeout)

我使用 EnvoltorioWorker.worker.* 调用连接或其他外部控制方法,并使用 EnvoltorioWorker.get / EnvoltorioWorker.put 与内部 class 正确通信,就像这样:

def main()
    worker_container = EnvoltorioWorker(task_id=1)
    worker_container.worker.start()
    print(worker_container.get())

通常情况下,如果不需要对 worker 的其他访问权限,我也会在 EnvoltorioWorker 中为 start()、join() 和 nonwait_stop() 创建接口。

它可能看起来很虚,而且可能有更好的方法来实现这一点,所以:

哪种方法(A 或 B)更好?从 Thread 继承是处理线程的正确方法 Python? 我在分布式环境中使用 dispycos 和类似的信封来与我的线程通信

编辑:刚刚注意到我忘记翻译 classes 中的注释和一些字符串,但它们足够简单,所以我认为它是可读的。有时间我再编辑。

有什么想法吗?

您的队列并没有真正存储在线程中。假设这里CPython,所有对象都存储在堆上,线程只有一个私有栈。堆上的对象在同一进程中的所有线程之间共享。

Memory management in Python involves a private heap containing all Python objects and data structures. The management of this private heap is ensured internally by the Python memory manager. The Python memory manager has different components which deal with various dynamic storage management aspects, like sharing, segmentation, preallocation or caching. docs

由此可见, 您的对象(您的队列)位于何处不是问题,因为它始终在堆上。 Python 中的变量(名称)只是对这些对象的 引用

这里影响您的运行时间的是您通过嵌套函数/方法调用向堆栈添加了多少个调用帧,以及您需要多少字节码指令。那么这对时序有什么影响呢?


基准

考虑以下队列和工作程序的虚拟设置。为简单起见,此处未对虚拟工作线程进行线程化,因为在我们假装只是耗尽预填充队列的情况下,对其进行线程化不会影响计时。

class Queue:
    def get(self):
        return 1

class Worker:
    def __init__(self, queue):
        self.queue = queue
        self.quick_get = self.queue.get # a reference to a method as instance attribute

    def get(self):
        return self.queue.get()

    def quick_get_method(self):
        return self.quick_get()

你怎么看,Worker 有两个版本的 get 方法,get 在某种程度上 定义它和 quick_get_method,这是一个更短的字节码指令,我们稍后会看到。 worker 实例不仅持有对 queue 实例的引用,而且还通过 self.quick_get 直接指向 queue.get,这是我们节省一条指令的地方。

现在是在 IPython 会话中从假队列对 .get() 的所有可能性进行基准测试的时间:

q = Queue()
w = Worker(q)

%timeit q.get()
285 ns ± 1.9 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
%timeit w.get()
609 ns ± 2.9 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
%timeit w.quick_get()
286 ns ± 0.756 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
%timeit w.quick_get_method()
555 ns ± 0.855 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

请注意,q.get()w.quick_get() 在时间上没有区别。 另请注意,与传统的 w.get() 相比,w.quick_get_method() 的时序有所改进。与 q.get()w.quick_get() 相比,使用 Worker-method 调用队列中的 get() 仍然几乎翻了一番。这是为什么?

通过使用 dis 模块,可以获得解释器正在处理的 Python 字节码指令的人类可读版本。

import dis

dis.dis(q.get)
  3           0 LOAD_CONST               1 (1)
              2 RETURN_VALUE

dis.dis(w.get)
  8           0 LOAD_FAST                0 (self)
              2 LOAD_ATTR                0 (queue)
              4 LOAD_METHOD              1 (get)
              6 CALL_METHOD              0
              8 RETURN_VALUE

dis.dis(w.quick_get)
  3           0 LOAD_CONST               1 (1)
              2 RETURN_VALUE

dis.dis(w.quick_get_method)
 11           0 LOAD_FAST                0 (self)
              2 LOAD_METHOD              0 (quick_get)
              4 CALL_METHOD              0
              6 RETURN_VALUE

记住我们的dummy Queue.get这里只是返回1。你看q.getw.quick_get一样,这也反映在我们之前看到的时间上.请注意 w.quick_get_method 直接加载 quick_get,这只是 queue.get 正在引用的对象的另一个名称/变量。

你还可以在dis模块的帮助下打印栈的深度:

def print_stack_depth(f):
    print(*[s for s in dis.code_info(f).split('\n') if
            s.startswith('Stack size:')]
    )

print_stack_depth(q.get)
Stack size:        1 
print_stack_depth(w.get)
Stack size:        2
print_stack_depth(w.quick_get)
Stack size:        1
print_stack_depth(w.quick_get_method)
Stack size:        2

不同方法之间的字节码和时间差异表明(不足为奇)添加另一个帧(通过添加另一种方法)对性能的影响最大。


回顾

上面的分析并不是暗示不使用额外的 Worker 方法来调用引用对象的方法 (queue.get)。为了可读性、日志记录和更容易调试,这样做是正确的做法。例如,像 Worker.quick_get_method 这样的优化,您也可以在 Stdlib 的 multiprocessing.pool.Pool 中找到,它也在内部使用队列。

从基准测试的时间来看,几百纳秒并不多(对于 Python)。在 Python 3 中,线程可以保持 GIL 的默认最大时间间隔是 5 毫秒,因此,一次执行字节码。那是 5*1000*1000 纳秒。

与无论如何引入的开销多线程相比,几百纳秒也很小。例如,我发现在一个线程中的 queue.put(integer) 之后添加 20 μs 的睡眠,而在另一个线程中仅从队列中读取,导致平均每次迭代增加约 64.0 μs 的额外开销(20 μs 睡眠不包括)超过 100k 的范围(Python 3.7.1,Ubuntu 18.04)。


设计

关于你关于设计偏好的问题,我肯定会在这里选择方法 A 而不是方法 B。更重要的是,如果你的队列不是跨多个线程使用的话。 IMO 你在最后一个片段中的混合创建在你只在内部使用 one WorkerGenerico 实例(而不是工作线程池)的情况下不必要地使事情复杂化/理解。与方法 A 相反,这里你的工人的 "threadiness" 也深埋在另一个 class.