多处理事件使我的代码变慢

multiprocessing Event makes my code slow


我有一个主进程正在使用来自许多不同市场的数据。它对消息进行一些初步处理,然后将其传递到多处理队列(每个独特的市场都有自己的专用进程,在队列的另一端称之为 Parse)。然后主进程为涉及的特定市场调用 mp.Event.set()

Parse 中是对 mp.Event.wait() 的调用,它会暂停进程,除非有数据被送入队列。在 Parse 结束时它调用 mp.Event.clear(). 我这样做是因为我正在使用 while True 循环在数据通过队列时捕获数据。如果我不暂停 Parse,它将使用 100% 的 CPU,而我没有足够的内核(更不用说它非常浪费)。

今天晚上我意识到 Parse 比 运行 花费的时间太长了,从 0.3 秒到 18 秒。市场数据消息可能每 12 毫秒出现一次,因此显然这是行不通的。 Parse各方面都非常快,除了mp.Event.wait()。此调用几乎占了 运行 时间的 100%。

我将所有 mp.Event 对象存储在配置文件中定义的字典中。我担心会发生以下两种情况之一:

  1. 设置和清除事件的每个实例都会阻塞所有其他事件,其方式类似于 mp.Manager 处理共享对象的方式。

  2. mp.Event 只是很慢,并且它的状态需要很长时间才能跨进程传播...

我正在考虑通过使用 zmq (ZeroMQ) 而不是 mp.Queue 管道传输数据来解决这个问题,但在我设置它之前,我想问问聪明的人。

我在这里做错了什么吗?有什么办法可以加快 mp.Event 标记的速度吗?

编辑

为了回应评论,这里有一个例子:

config.py 文件中,我这样定义字典:

E,Q={},{}
for m in all_markets:
    E[m] = mp.Event()
    Q[m] = mp.Queue()

然后在读取数据的主进程中,我调用 sort,它看起来像这样:

def sort(message, m):
    if message satisfies condition1:
        define some args
        Q[m].put(message, *args)
        E[m].set()
    if message satisfies condition2:
        #basically the same

然后最后在Parse,程序启动时启动:

def Parse(message,m,Q,E):
    while True:
        E[m].wait()
        message = Q[m].get()
        #do a bunch of processing on the message
        #put the results in some other queues
        E[m].clear()

EDIT2

Procs 是这样生成和启动的:

def mitosis():
    mp.Process(target=main).start()

def pstart(m,func,**kwargs):
    if func=='parser':
        p = mp.Process(target=parser, args=(m, Q, E, *args) )
        p.start()

def main():
    PROCS={}
    for m in all_markets:
        for procs in proclist:
        PROCS[(m,proc)] = pstart(m,proc,**kwargs)

我认为你的问题是你的 Event 代码被破坏了。

想象一下这个场景:

  • 主进程为 m 调用 sort
  • sort 调用 Q[m].putE[m].set.
  • Parse 唤醒,Q[m].get,并开始处理。
  • 主进程再次调用 sort 相同的 m
  • sort 调用 Q[m].putE[m].set.
  • Parse 处理完第一条消息,调用 E[m].clear.

现在 Parse 正在等待再次设置 Event。这可能不会发生很长一段时间。而且,即使它发生得很快,它仍然不会赶上;它只为每个 Event.wait.

做一个 Q[m].get

所以,你最终得到的结果是 Parse 似乎越来越落后。当您尝试分析它以找出原因时,您会发现它一直在等待 E[m].wait。但这不是因为 E[m].wait 慢,只是因为事件触发器丢失了。

这不是这里的唯一竞争条件,它只是最明显的一个。

普遍的问题是您不能以这种方式使用事件对象。通常,您可以通过使用 Condition 来解决它,或者一次性触发和自重置 Events,并在每个 Event.[= 之后循环 Q[m].get(block=False) 46=]

但实际上,一开始就没有必要这样做。如果您只是完全删除 Event,当 Parse 调用 Q[m].get 时,它会阻塞直到那里有东西。因此,当 sort 调用 Q[m].put 时,它会唤醒 Parse,并且不需要其他任何东西。

事实上,Queue 的全部意义在于它本质上是自同步的。如果你不想那样,使用 Pipe,然后你可以使用 Condition 来发送信号。但在简单的情况下,这只是 Queue.

的一个低效版本