Python 不同条件下的多处理队列限制
Python Multiprocessing queue limitations in different conditions
import multiprocessing
import time
def WORK(x,q,it):
for i in range(it):
t = x + '---'+str(i)
q.put(t)
def cons(q,cp):
while not q.empty():
cp.append(q.get())
return q.put(cp)
if __name__ == '__main__':
cp = []
it = 600 #iteratons
start = time.perf_counter()
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target = WORK, args = ('n',q,it))
p2 = multiprocessing.Process(target=WORK, args=('x',q,it))
p3 = multiprocessing.Process(target=cons, args=(q,cp,))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print(q.get())
end = time.perf_counter()
print(end - start)
我在 Pycharm 和 Colab 中 运行 遇到了这个代码问题,如果我 运行 在 colab 中它只在 1000 次迭代和更少 [=22= 中工作正常]WORK() 进程,如果更多 - 它会冻结。
在 Pycharm 中,只有 500 次或更少的迭代才能正常工作
什么问题???有什么限制吗?
所以我发现不是很好的解决方案是删除连接或将其放在队列中的 dict 调用之后,它有助于获得 mor 限制,使用此代码它开始在 pycharm 中进行 1000 次迭代但 10000迭代再次陷入僵局
p1.join()
p2.join()
print(q.get())
p3.join()
end = time.perf_counter()
print(end - start)
进一步的改变帮助我通过添加 queuq maxsize 将迭代限制增加到 10000:
q = multiprocessing.Queue(maxsize = 1000)
那么这个队列的限制和规律是什么???
如何管理无尽的队列,例如来自websockets,他们连续发送数据
您的代码有几个问题。首先,根据 multiprocessing.Queue
上的文档,方法 empty
不 可靠。所以在函数 cons
中语句 while not q.empty():
是有问题的。但是即使方法 Queue.empty
是可靠的,这里还是存在 竞争条件 。您已经并行启动进程 WORK
和 cons
,其中前者正在将元素写入队列,而后者正在读取直到发现队列为空。但是,如果 cons
在 WORK
开始写入其第一个元素之前运行,它将发现队列立即为空,这不是您预期的结果。正如我在上面的评论中提到的,在检索到进程已写入的所有记录之前,您不得尝试加入正在写入队列的进程。
您遇到的另一个问题是您正在向 cons
传递一个空列表 cp
,您一直在向其追加。但是 cons
是属于不同地址 space 中的进程 运行 的函数,因此它附加到的 cp
列表是 而不是 与主进程中相同的 cp
列表。请注意这一点。
最后,cons
将其结果写入它正在读取的同一队列,因此主进程正在从同一队列读取该结果。所以我们有另一个竞争条件:一旦主进程被修改为在它加入所有进程之前不从这个队列读取,主进程和 cons
现在都从同一个队列并行读取。我们现在需要一个单独的输入和输出队列,这样就不会有冲突。这解决了这个竞争条件。
为了解决第一个竞争条件,WORK
进程应该写一个特殊的 sentinel 记录作为 记录的结尾 指标。如果 None
不是有效的 normal 记录,它可能是值 None
或者它可能是 any 特殊对象不能被误认为是真实记录。由于我们有两个进程将记录写入同一个输入队列供 cons
读取,我们最终会得到两个哨兵记录,cons
将不得不寻找这些记录才能知道确实没有更多剩余记录。
import multiprocessing
import time
SENTINEL = 'SENTINEL' # or None
def WORK(x, q, it):
for i in range(it):
t = x + '---' + str(i)
q.put(t)
q.put(SENTINEL) # show end of records
def cons(q_in, q_out, cp):
# We now are looking for two end of record indicators:
for record in iter(q_in.get, SENTINEL):
cp.append(record)
for record in iter(q_in.get, SENTINEL):
cp.append(record)
q_out.put(cp)
if __name__ == '__main__':
it = 600 #iteratons
start = time.perf_counter()
q_in = multiprocessing.Queue()
q_out = multiprocessing.Queue()
p1 = multiprocessing.Process(target=WORK, args = ('n', q_in, it))
p2 = multiprocessing.Process(target=WORK, args=('x', q_in, it))
cp = []
p3 = multiprocessing.Process(target=cons, args=(q_in, q_out, cp))
p1.start()
p2.start()
p3.start()
cp = q_out.get()
print(len(cp))
p1.join()
p2.join()
p3.join()
end = time.perf_counter()
print(end - start)
打印:
1200
0.1717168
import multiprocessing
import time
def WORK(x,q,it):
for i in range(it):
t = x + '---'+str(i)
q.put(t)
def cons(q,cp):
while not q.empty():
cp.append(q.get())
return q.put(cp)
if __name__ == '__main__':
cp = []
it = 600 #iteratons
start = time.perf_counter()
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target = WORK, args = ('n',q,it))
p2 = multiprocessing.Process(target=WORK, args=('x',q,it))
p3 = multiprocessing.Process(target=cons, args=(q,cp,))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print(q.get())
end = time.perf_counter()
print(end - start)
我在 Pycharm 和 Colab 中 运行 遇到了这个代码问题,如果我 运行 在 colab 中它只在 1000 次迭代和更少 [=22= 中工作正常]WORK() 进程,如果更多 - 它会冻结。 在 Pycharm 中,只有 500 次或更少的迭代才能正常工作 什么问题???有什么限制吗?
所以我发现不是很好的解决方案是删除连接或将其放在队列中的 dict 调用之后,它有助于获得 mor 限制,使用此代码它开始在 pycharm 中进行 1000 次迭代但 10000迭代再次陷入僵局
p1.join()
p2.join()
print(q.get())
p3.join()
end = time.perf_counter()
print(end - start)
进一步的改变帮助我通过添加 queuq maxsize 将迭代限制增加到 10000:
q = multiprocessing.Queue(maxsize = 1000)
那么这个队列的限制和规律是什么??? 如何管理无尽的队列,例如来自websockets,他们连续发送数据
您的代码有几个问题。首先,根据 multiprocessing.Queue
上的文档,方法 empty
不 可靠。所以在函数 cons
中语句 while not q.empty():
是有问题的。但是即使方法 Queue.empty
是可靠的,这里还是存在 竞争条件 。您已经并行启动进程 WORK
和 cons
,其中前者正在将元素写入队列,而后者正在读取直到发现队列为空。但是,如果 cons
在 WORK
开始写入其第一个元素之前运行,它将发现队列立即为空,这不是您预期的结果。正如我在上面的评论中提到的,在检索到进程已写入的所有记录之前,您不得尝试加入正在写入队列的进程。
您遇到的另一个问题是您正在向 cons
传递一个空列表 cp
,您一直在向其追加。但是 cons
是属于不同地址 space 中的进程 运行 的函数,因此它附加到的 cp
列表是 而不是 与主进程中相同的 cp
列表。请注意这一点。
最后,cons
将其结果写入它正在读取的同一队列,因此主进程正在从同一队列读取该结果。所以我们有另一个竞争条件:一旦主进程被修改为在它加入所有进程之前不从这个队列读取,主进程和 cons
现在都从同一个队列并行读取。我们现在需要一个单独的输入和输出队列,这样就不会有冲突。这解决了这个竞争条件。
为了解决第一个竞争条件,WORK
进程应该写一个特殊的 sentinel 记录作为 记录的结尾 指标。如果 None
不是有效的 normal 记录,它可能是值 None
或者它可能是 any 特殊对象不能被误认为是真实记录。由于我们有两个进程将记录写入同一个输入队列供 cons
读取,我们最终会得到两个哨兵记录,cons
将不得不寻找这些记录才能知道确实没有更多剩余记录。
import multiprocessing
import time
SENTINEL = 'SENTINEL' # or None
def WORK(x, q, it):
for i in range(it):
t = x + '---' + str(i)
q.put(t)
q.put(SENTINEL) # show end of records
def cons(q_in, q_out, cp):
# We now are looking for two end of record indicators:
for record in iter(q_in.get, SENTINEL):
cp.append(record)
for record in iter(q_in.get, SENTINEL):
cp.append(record)
q_out.put(cp)
if __name__ == '__main__':
it = 600 #iteratons
start = time.perf_counter()
q_in = multiprocessing.Queue()
q_out = multiprocessing.Queue()
p1 = multiprocessing.Process(target=WORK, args = ('n', q_in, it))
p2 = multiprocessing.Process(target=WORK, args=('x', q_in, it))
cp = []
p3 = multiprocessing.Process(target=cons, args=(q_in, q_out, cp))
p1.start()
p2.start()
p3.start()
cp = q_out.get()
print(len(cp))
p1.join()
p2.join()
p3.join()
end = time.perf_counter()
print(end - start)
打印:
1200
0.1717168