Python 不同条件下的多处理队列限制

Python Multiprocessing queue limitations in different conditions

import multiprocessing
import time


def WORK(x,q,it):
        
        for i in range(it):
            t = x + '---'+str(i)
               
            q.put(t)
      
def cons(q,cp):
    while not q.empty():
         cp.append(q.get())
    return q.put(cp)

if __name__ == '__main__':
    cp = []
    it = 600 #iteratons
    start = time.perf_counter()
    q = multiprocessing.Queue()
    p1 = multiprocessing.Process(target = WORK, args = ('n',q,it))
    p2 = multiprocessing.Process(target=WORK, args=('x',q,it))
    p3 = multiprocessing.Process(target=cons, args=(q,cp,))
    
    p1.start()
    p2.start()
    p3.start()


    p1.join()
    p2.join()
    p3.join()
    print(q.get())
    end = time.perf_counter()
    print(end - start)

我在 Pycharm 和 Colab 中 运行 遇到了这个代码问题,如果我 运行 在 colab 中它只在 1000 次迭代和更少 [=22= 中工作正常]WORK() 进程,如果更多 - 它会冻结。 在 Pycharm 中,只有 500 次或更少的迭代才能正常工作 什么问题???有什么限制吗?

所以我发现不是很好的解决方案是删除连接或将其放在队列中的 dict 调用之后,它有助于获得 mor 限制,使用此代码它开始在 pycharm 中进行 1000 次迭代但 10000迭代再次陷入僵局

p1.join()
p2.join()
print(q.get())
p3.join()
end = time.perf_counter()
print(end - start)

进一步的改变帮助我通过添加 queuq maxsize 将迭代限制增加到 10000:

q = multiprocessing.Queue(maxsize = 1000)

那么这个队列的限制和规律是什么??? 如何管理无尽的队列,例如来自websockets,他们连续发送数据

您的代码有几个问题。首先,根据 multiprocessing.Queue 上的文档,方法 empty 可靠。所以在函数 cons 中语句 while not q.empty(): 是有问题的。但是即使方法 Queue.empty 是可靠的,这里还是存在 竞争条件 。您已经并行启动进程 WORKcons,其中前者正在将元素写入队列,而后者正在读取直到发现队列为空。但是,如果 consWORK 开始写入其第一个元素之前运行,它将发现队列立即为空,这不是您预期的结果。正如我在上面的评论中提到的,在检索到进程已写入的所有记录之前,您不得尝试加入正在写入队列的进程。

您遇到的另一个问题是您正在向 cons 传递一个空列表 cp,您一直在向其追加。但是 cons 是属于不同地址 space 中的进程 运行 的函数,因此它附加到的 cp 列表是 而不是 与主进程中相同的 cp 列表。请注意这一点。

最后,cons 将其结果写入它正在读取的同一队列,因此主进程正在从同一队列读取该结果。所以我们有另一个竞争条件:一旦主进程被修改为在它加入所有进程之前不从这个队列读取,主进程和 cons 现在都从同一个队列并行读取。我们现在需要一个单独的输入和输出队列,这样就不会有冲突。这解决了这个竞争条件。

为了解决第一个竞争条件,WORK 进程应该写一个特殊的 sentinel 记录作为 记录的结尾 指标。如果 None 不是有效的 normal 记录,它可能是值 None 或者它可能是 any 特殊对象不能被误认为是真实记录。由于我们有两个进程将记录写入同一个输入队列供 cons 读取,我们最终会得到两个哨兵记录,cons 将不得不寻找这些记录才能知道确实没有更多剩余记录。

import multiprocessing
import time

SENTINEL = 'SENTINEL' # or None

def WORK(x, q, it):
        for i in range(it):
            t = x + '---' + str(i)
            q.put(t)
        q.put(SENTINEL) # show end of records

def cons(q_in, q_out, cp):
    # We now are looking for two end of record indicators:
    for record in iter(q_in.get, SENTINEL):
        cp.append(record)
    for record in iter(q_in.get, SENTINEL):
        cp.append(record)
    q_out.put(cp)

if __name__ == '__main__':
    it = 600 #iteratons
    start = time.perf_counter()
    q_in = multiprocessing.Queue()
    q_out = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=WORK, args = ('n', q_in, it))
    p2 = multiprocessing.Process(target=WORK, args=('x', q_in, it))
    cp = []
    p3 = multiprocessing.Process(target=cons, args=(q_in, q_out, cp))

    p1.start()
    p2.start()
    p3.start()

    cp = q_out.get()
    print(len(cp))

    p1.join()
    p2.join()
    p3.join()
    end = time.perf_counter()
    print(end - start)

打印:

1200
0.1717168