跨多个进程使用双端队列对象

Working with deque object across multiple processes

我正在尝试减少读取包含大约 100,000 个条目的数据库的处理时间,但我需要以特定方式对它们进行格式化,为此,我尝试使用 python的 multiprocessing.map 功能完美运行,只是我似乎无法获得任何形式的队列引用来处理它们。

我一直在使用来自 Filling a queue and managing multiprocessing in python to guide me for using queues across multiple processes, and Using a global variable with a thread 的信息来指导我跨线程使用全局变量。我已经让软件工作了,但是当我在 运行 处理过程后检查 list/queue/dict/map 长度时,它总是 returns 零

我写了一个简单的例子来说明我的意思: 您必须 运行 将脚本作为文件,mapinitialize 函数在解释器中不起作用。

from multiprocessing import Pool
from collections import deque

global_q = deque()

def my_init(q):
    global global_q
    global_q = q
    q.append("Hello world")


def map_fn(i):
    global global_q
    global_q.append(i)


if __name__ == "__main__":
    with Pool(3, my_init, (global_q,)) as pool:
        pool.map(map_fn, range(3))
    for p in range(len(global_q)):
        print(global_q.pop())

理论上,当我使用 pool 函数将队列对象引用从主线程传递到工作线程,然后使用给定函数初始化该线程的全局变量,然后当我将元素插入到map 函数之后的队列,该对象引用仍应指向原始队列对象引用(长话短说,所有内容都应在同一队列中结束,因为它们都指向内存中的同一位置) .

所以,我预计:

Hello World
Hello World
Hello World
1
2
3

当然,1, 2, 3 的顺序是任意的,但是您在输出中看到的是 ''

当我将对象引用传递给 pool 函数时,为什么没有任何反应?

您不能将全局变量用于多进程。

传递给函数多处理队列。

from multiprocessing import Queue
queue= Queue() 

def worker(q):
    q.put(something)

您可能还遇到过代码没问题,但是由于池创建了单独的进程,甚至错误也被分开了,因此您看不到代码不仅不起作用,而且会抛出错误。

您的输出是 '' 的原因是因为您的 q/global_q 没有附加任何内容。如果它被附加,那么只有一些变量,可能被称为 global_q,但它与主线程 global_q 中的 global_q 完全不同

尝试在你想要多处理的函数中打印('Hello world'),你会自己看到,实际上根本没有打印任何东西。该进程只是在您的主线程之外,访问该进程的唯一方法是通过多处理队列。您通过 queue.put('something') 和 something = queue.get()

访问队列

尝试理解这段代码,你会做得很好:

import multiprocessing as mp

shared_queue = mp.Queue() # This will be shared among all procesess, but you need to pass the queue as an argument in the process. You CANNOT use it as global variable. Understand that the functions kind of run in total different processes and nothing can really access them... Except multiprocessing.Queue - that can be shared across all processes.


def channel(que,channel_num):
    que.put(channel_num)

if __name__ == '__main__':
    processes = [mp.Process(target=channel, args=(shared_queue, channel_num)) for channel_num in range(8)]

    for p in processes:
        p.start()


    for p in processes: # wait for all results to close the pool
        p.join()

    for i in range(8): # Get data from Queue. (you can get data out of it at any time actually)
        print(shared_queue.get())

下面是一个示例,说明如何通过扩展 multiprocessing.managers.BaseManager class 以支持 deques 在进程之间共享某些内容。

文档中有一个关于创建它们的 Customized managers 部分。

import collections
from multiprocessing import Pool
from multiprocessing.managers import BaseManager


class DequeManager(BaseManager):
    pass

class DequeProxy(object):
    def __init__(self, *args):
        self.deque = collections.deque(*args)
    def __len__(self):
        return self.deque.__len__()
    def appendleft(self, x):
        self.deque.appendleft(x)
    def append(self, x):
        self.deque.append(x)
    def pop(self):
        return self.deque.pop()
    def popleft(self):
        return self.deque.popleft()

# Currently only exposes a subset of deque's methods.
DequeManager.register('DequeProxy', DequeProxy,
                      exposed=['__len__', 'append', 'appendleft',
                               'pop', 'popleft'])


process_shared_deque = None  # Global only within each process.

def my_init(q):
    """ Initialize module-level global. """
    global process_shared_deque
    process_shared_deque = q
    q.append("Hello world")


def map_fn(i):
    process_shared_deque.append(i)  # deque's don't have a "put()" method.


if __name__ == "__main__":
    manager = DequeManager()
    manager.start()
    shared_deque = manager.DequeProxy()

    with Pool(3, my_init, (shared_deque,)) as pool:
        pool.map(map_fn, range(3))

    for p in range(len(shared_deque)):  # Show left-to-right contents.
        print(shared_deque.popleft())

输出:

Hello world
0
1
2
Hello world
Hello world