多处理事件使我的代码变慢
multiprocessing Event makes my code slow
我有一个主进程正在使用来自许多不同市场的数据。它对消息进行一些初步处理,然后将其传递到多处理队列(每个独特的市场都有自己的专用进程,在队列的另一端称之为 Parse
)。然后主进程为涉及的特定市场调用 mp.Event.set()
。
在 Parse
中是对 mp.Event.wait()
的调用,它会暂停进程,除非有数据被送入队列。在 Parse
结束时它调用 mp.Event.clear().
我这样做是因为我正在使用 while True
循环在数据通过队列时捕获数据。如果我不暂停 Parse
,它将使用 100% 的 CPU,而我没有足够的内核(更不用说它非常浪费)。
今天晚上我意识到 Parse
比 运行 花费的时间太长了,从 0.3 秒到 18 秒。市场数据消息可能每 12 毫秒出现一次,因此显然这是行不通的。 Parse
各方面都非常快,除了mp.Event.wait()
。此调用几乎占了 运行 时间的 100%。
我将所有 mp.Event
对象存储在配置文件中定义的字典中。我担心会发生以下两种情况之一:
设置和清除事件的每个实例都会阻塞所有其他事件,其方式类似于 mp.Manager
处理共享对象的方式。
mp.Event
只是很慢,并且它的状态需要很长时间才能跨进程传播...
我正在考虑通过使用 zmq
(ZeroMQ) 而不是 mp.Queue
管道传输数据来解决这个问题,但在我设置它之前,我想问问聪明的人。
我在这里做错了什么吗?有什么办法可以加快 mp.Event
标记的速度吗?
编辑
为了回应评论,这里有一个例子:
在 config.py
文件中,我这样定义字典:
E,Q={},{}
for m in all_markets:
E[m] = mp.Event()
Q[m] = mp.Queue()
然后在读取数据的主进程中,我调用 sort
,它看起来像这样:
def sort(message, m):
if message satisfies condition1:
define some args
Q[m].put(message, *args)
E[m].set()
if message satisfies condition2:
#basically the same
然后最后在Parse
,程序启动时启动:
def Parse(message,m,Q,E):
while True:
E[m].wait()
message = Q[m].get()
#do a bunch of processing on the message
#put the results in some other queues
E[m].clear()
EDIT2
Procs 是这样生成和启动的:
def mitosis():
mp.Process(target=main).start()
def pstart(m,func,**kwargs):
if func=='parser':
p = mp.Process(target=parser, args=(m, Q, E, *args) )
p.start()
def main():
PROCS={}
for m in all_markets:
for procs in proclist:
PROCS[(m,proc)] = pstart(m,proc,**kwargs)
我认为你的问题是你的 Event
代码被破坏了。
想象一下这个场景:
- 主进程为
m
调用 sort
。
sort
调用 Q[m].put
和 E[m].set
.
Parse
唤醒,Q[m].get
,并开始处理。
- 主进程再次调用
sort
相同的 m
。
sort
调用 Q[m].put
和 E[m].set
.
Parse
处理完第一条消息,调用 E[m].clear
.
现在 Parse
正在等待再次设置 Event
。这可能不会发生很长一段时间。而且,即使它发生得很快,它仍然不会赶上;它只为每个 Event.wait
.
做一个 Q[m].get
所以,你最终得到的结果是 Parse
似乎越来越落后。当您尝试分析它以找出原因时,您会发现它一直在等待 E[m].wait
。但这不是因为 E[m].wait
慢,只是因为事件触发器丢失了。
这不是这里的唯一竞争条件,它只是最明显的一个。
普遍的问题是您不能以这种方式使用事件对象。通常,您可以通过使用 Condition
来解决它,或者一次性触发和自重置 Event
s,并在每个 Event
.[= 之后循环 Q[m].get(block=False)
46=]
但实际上,一开始就没有必要这样做。如果您只是完全删除 Event
,当 Parse
调用 Q[m].get
时,它会阻塞直到那里有东西。因此,当 sort
调用 Q[m].put
时,它会唤醒 Parse
,并且不需要其他任何东西。
事实上,Queue
的全部意义在于它本质上是自同步的。如果你不想那样,使用 Pipe
,然后你可以使用 Condition
来发送信号。但在简单的情况下,这只是 Queue
.
的一个低效版本
我有一个主进程正在使用来自许多不同市场的数据。它对消息进行一些初步处理,然后将其传递到多处理队列(每个独特的市场都有自己的专用进程,在队列的另一端称之为 Parse
)。然后主进程为涉及的特定市场调用 mp.Event.set()
。
在 Parse
中是对 mp.Event.wait()
的调用,它会暂停进程,除非有数据被送入队列。在 Parse
结束时它调用 mp.Event.clear().
我这样做是因为我正在使用 while True
循环在数据通过队列时捕获数据。如果我不暂停 Parse
,它将使用 100% 的 CPU,而我没有足够的内核(更不用说它非常浪费)。
今天晚上我意识到 Parse
比 运行 花费的时间太长了,从 0.3 秒到 18 秒。市场数据消息可能每 12 毫秒出现一次,因此显然这是行不通的。 Parse
各方面都非常快,除了mp.Event.wait()
。此调用几乎占了 运行 时间的 100%。
我将所有 mp.Event
对象存储在配置文件中定义的字典中。我担心会发生以下两种情况之一:
设置和清除事件的每个实例都会阻塞所有其他事件,其方式类似于
mp.Manager
处理共享对象的方式。mp.Event
只是很慢,并且它的状态需要很长时间才能跨进程传播...
我正在考虑通过使用 zmq
(ZeroMQ) 而不是 mp.Queue
管道传输数据来解决这个问题,但在我设置它之前,我想问问聪明的人。
我在这里做错了什么吗?有什么办法可以加快 mp.Event
标记的速度吗?
编辑
为了回应评论,这里有一个例子:
在 config.py
文件中,我这样定义字典:
E,Q={},{}
for m in all_markets:
E[m] = mp.Event()
Q[m] = mp.Queue()
然后在读取数据的主进程中,我调用 sort
,它看起来像这样:
def sort(message, m):
if message satisfies condition1:
define some args
Q[m].put(message, *args)
E[m].set()
if message satisfies condition2:
#basically the same
然后最后在Parse
,程序启动时启动:
def Parse(message,m,Q,E):
while True:
E[m].wait()
message = Q[m].get()
#do a bunch of processing on the message
#put the results in some other queues
E[m].clear()
EDIT2
Procs 是这样生成和启动的:
def mitosis():
mp.Process(target=main).start()
def pstart(m,func,**kwargs):
if func=='parser':
p = mp.Process(target=parser, args=(m, Q, E, *args) )
p.start()
def main():
PROCS={}
for m in all_markets:
for procs in proclist:
PROCS[(m,proc)] = pstart(m,proc,**kwargs)
我认为你的问题是你的 Event
代码被破坏了。
想象一下这个场景:
- 主进程为
m
调用sort
。 sort
调用Q[m].put
和E[m].set
.Parse
唤醒,Q[m].get
,并开始处理。- 主进程再次调用
sort
相同的m
。 sort
调用Q[m].put
和E[m].set
.Parse
处理完第一条消息,调用E[m].clear
.
现在 Parse
正在等待再次设置 Event
。这可能不会发生很长一段时间。而且,即使它发生得很快,它仍然不会赶上;它只为每个 Event.wait
.
Q[m].get
所以,你最终得到的结果是 Parse
似乎越来越落后。当您尝试分析它以找出原因时,您会发现它一直在等待 E[m].wait
。但这不是因为 E[m].wait
慢,只是因为事件触发器丢失了。
这不是这里的唯一竞争条件,它只是最明显的一个。
普遍的问题是您不能以这种方式使用事件对象。通常,您可以通过使用 Condition
来解决它,或者一次性触发和自重置 Event
s,并在每个 Event
.[= 之后循环 Q[m].get(block=False)
46=]
但实际上,一开始就没有必要这样做。如果您只是完全删除 Event
,当 Parse
调用 Q[m].get
时,它会阻塞直到那里有东西。因此,当 sort
调用 Q[m].put
时,它会唤醒 Parse
,并且不需要其他任何东西。
事实上,Queue
的全部意义在于它本质上是自同步的。如果你不想那样,使用 Pipe
,然后你可以使用 Condition
来发送信号。但在简单的情况下,这只是 Queue
.