multiprocessing.Queue 的高性能替代品

High-performance replacement for multiprocessing.Queue

我的分布式应用程序由许多将任务推送到多个 FIFO 队列的生产者和每个队列的多个消费者组成。所有这些组件都位于 单个 节点上,因此不涉及网络。

此模式得到 Python 的内置 multiprocessing.Queue 的完美支持,但是当我扩展我的应用程序时,队列实现似乎是一个瓶颈。我没有发送大量数据,所以内存共享不能解决问题。我需要的是每秒 10^4-10^5 条小消息的快速保证传递。每条消息大约 100 字节。

我是快速分布式计算领域的新手,我对大量的选项感到非常困惑。还有RabbitMQ、Redis、Kafka等

ZeroMQ 是一个更专注和紧凑的替代方案,它还有 nanomsg 和 nng 等后继者。此外,在没有代理的情况下,实现诸如多对多队列之类的具有保证交付的东西似乎并不平凡。

如果有人能指出一种 "standard" 使用更快的框架来做类似事情的方法,我将不胜感激。

我认为这在很大程度上部分取决于您对个别消息的重视程度。

如果每个人都至关重要,并且您必须考虑在某处发生故障时他们会发生什么情况,那么像 RabbitMQ 这样的框架可能会有用。 RabbitMQ 有一个代理,可以将其配置为某种高可用性、高可靠性模式。通过正确的队列设置,RabbitMQ 将照管您的消息,直到系统的某些部分使用它们。

要做到这一切,RabbitMQ 需要一个代理。这使得它相当慢。尽管有一次讨论了在 ZeroMQ 的底层协议 (zmtp) 之上重新实现 RabbitMQ 并取消代理,而是在端点中实现所有功能。

相比之下,ZeroMQ 做的要少得多,无法保证在发生故障时,您的消息实际上最终会到达预期的目的地。如果一个进程死掉,或者网络连接失败,那么消息很可能已经丢失。更新的版本可以设置为主动监控连接,这样如果网络电缆断开或某个进程在某处终止,套接字另一端的端点可以很快得到通知。如果然后在 ZMQ 的 actor 框架之上实现一个通信顺序流程框架(想想:消息确认等。这会减慢它的速度),您最终可以得到一个系统,端点可以通过该系统确定消息已传输到预期目的地.

无代理允许 zmq 非常快。它在许多不同的传输中都很有效,范围从 inproctcp,所有这些都可以混合在一起。如果您不担心进程崩溃或网络连接失败,ZMQ 可以保证开箱即用地传递消息。

因此,决定在您的应用程序中什么是重要的有助于选择您正在使用的技术作为其中的一部分 - RabbitMQ、ZeroMQ 等。一旦您决定了,那么 "how to get the patterns I need" 减少为 "what patterns does that technology support"。据我所知,RabbitMQ 纯粹是 pub/sub(每个都可以有很多),而 ZeroMQ 有更多。

在尝试了一些可用的实现和框架之后,我仍然找不到适合我的任务的东西。要么太慢要么太重。

为了解决这个问题,我和我的同事开发了这个:https://github.com/alex-petrenko/faster-fifo

faster-fifo 是 Python 的 multiprocessing.Queue 的直接替代品,速度明显更快。事实上,它在我关心的配置(很多生产者,很少的消费者)中快了 30 倍,因为它还支持消费者端的 get_many() 方法。

它是无中断的、轻量级的,支持任意多对多配置,使用 pthread 同步原语为 Posix 系统实现。

我已尝试使用 Redis 服务器队列来替换 Python 标准多处理队列。 Redis 不行! Python 是最好的,最快的并且可以接受你扔给它的任何类型的数据,其中使用 Redis 和复杂的数据类型,例如带有大量 numpy 数组的 dict 等......你必须腌制或 json dumps/loads 这会增加流程的开销。

干杯, 史蒂夫