单个生产者和多个单线程消费者

Single producer and multiple single-threaded consumers

我的应用程序从网络接收数据包并将它们分派给一个或多个 "processors"。 (每个数据包都属于一个预定义的 "stream",可以通过查看数据包数据来识别。)

当前有一个线程完成所有工作:

  1. 从网络设备获取数据包
  2. 识别每个数据包的处理器
  3. 将数据包分派给它的处理器

传入数据的接收速度为每秒 2000 万个数据包(10Gbps 的 60 字节数据包。)

然而,此解决方案只能跟上极少数的流和处理器。例如,在 10 个流的情况下,已经有大约 10-20% 的数据包丢失。

由于步骤 (3) 是最昂贵的一步,我计划将该工作委托给工作线程池。

但是,我必须小心,因为处理器本身不是线程安全的。所以只有一个工作线程可以同时将数据包分派给同一个处理器。

这似乎是基于任务的编程的一个很好的用例。但是我无法轻易地将 TBB 文档中解释的设计模式与我的问题相匹配。

所以我的问题是:如何组织我的消费者线程,以便它们将数据包均匀地分配给单线程处理器?

我不期待一个完整的解决方案,但我很乐意接受你的建议或随机想法:)

这是我对可能的解决方案的想法。

假设我们有 n 个处理器。让我们引入 n 个互斥量,每个处理器一个。我们还介绍一个数据包队列。所有传入的数据包都放入此队列。

一个工作线程是这样运行的:

  1. 从传入数据包队列中抓取数据包。
  2. 确定必要的处理器。
  3. 尝试获取对应的互斥体。如果锁获取成功,则处理数据包。否则,重新入队,转1.
  4. 处理完成后,转到步骤1。

可能的缺点:

  1. 数据包重新排队,这意味着它们可能 delayed/processed 乱序,这对您来说可能是一个交易破坏者(不确定)。
  2. 队列中的争用可能很激烈。您可能想为此使用无锁队列。
  3. 队列明显是在消耗额外的内存,不知道你有没有空余内存。

编辑:关于内存消耗的更多想法 - 当然,可以对队列可以消耗的内存量设置上限 - 然后,问题是当你 运行 超出时该怎么办记忆。我想说最好的办法就是开始丢弃数据包(我的印象是丢弃一些数据包对你来说不是什么大问题)直到队列稍微耗尽。

有点相关 - 我认为这个用例的良好队列实现应该不惜一切代价避免动态内存分配 - 预先分配内存并确保关键代码路径上没有分配。

我已经完成了一些嵌入式编程,我不得不处理相对较高的吞吐量 - 没有你在这里的速度快!希望您使用的硬件也比我以前使用的更强大......有一些简单的策略应该适用于您的情况!

1。 input/processing 队列和相关的内存管理很关键。

如果数据速率很高,传入数据的队列必须非常高效。您应该进行尽可能少的处理,否则您可能会丢失设备中的数据。 (我习惯于从某种缓冲区相对较小的快速串行设备读取数据,因此在不丢失数据的情况下可以保留设备多长时间存在实时限制。这让我养成了习惯处理从设备读取作为一个完全独立的任务,它只处理读取数据,没有别的。)

一系列非常简单的固定大小的预分配缓冲区的效率差不多:有一个 'free' 缓冲区队列和一个 'filled' 缓冲区队列。如果使用无锁链表,维护这些链表会非常快,并且 enqueue/dequeue 的操作在许多 OS 中很常见。

避免使用 malloc 或其他动态分配,因为当它们需要管理它们自己的 'free' 和 'allocated' 块的数据结构时,它们具有显着的(并且通常是不可预测的)开销。如果它们同时释放或分配内存,它们还可能执行锁定,这些锁定可能会意外地阻塞生产者或工作线程。相反,尝试找到用于分配和释放由 OS 为队列提供的整个页面的较低级别例程(unixy 平台上的 mmap,VirtualAllocEx)。这些通常需要做的工作要少得多,因为它们使用 MMU 功能来映射 RAM 的物理页面并且在内存中没有复杂的数据结构来维护,每次调用都有更可靠的 运行time ,并且 可以 足够快地扩展您的免费列表,如果它 运行ning 低。

在生产者中,不要担心小于整个块的单元。从队列中取出一个空闲块,打包一个充满数据的块,将其添加到待处理的队列中。如果您必须确保每个数据包在固定的时间段内得到处理,或者您需要处理 'bursty' 数据速率,那么仍然尝试从您的输入设备读取一个完整的缓冲区,但要么减小块是 'reasonable' 的时间量,或者使用超时并将部分填充的块排入队列以进行处理,并且 'fill' 剩余部分使用某种空数据包。我发现这样做通常比必须包含大量代码来处理部分填充的缓冲区要快。

如果可以,请非常仔细地设置生产者线程的处理器关联和线程优先级。理想情况下,您希望生产者线程具有比任何消费者线程更高的优先级,并绑定到特定的核心。没有什么可以阻止输入数据在 运行 缓冲区 space.

之前被读取

2。处理中

你说过有:

  1. 多个流
  2. 几个'processors',不是线程安全的

这里有用的是并行处理数据包上的处理器运行,但从你的问题中并不清楚这在多大程度上是可能的。

处理器是否跨流线程安全? (只要处理器在两个不同的流上运行,我们可以 运行 两个不同线程中的处理器吗?)

同一流中的不同处理器之间的处理器是否是线程安全的? (我们可以 运行 多个处理器在不同线程中的同一流上吗?)

处理器是否需要 运行 特定顺序?

在不知道这一点的情况下,仍然有一些通用的东西是有用的建议。

有第二个线程负责从生产者读取完整的缓冲区并将它们分派给适当的处理器(在其他线程中),然后将完整的缓冲区放回 'empty' 队列中进行处理。虽然你失去了一些直线效率(一个线程进行读取和调度将略微 'faster' 比两个),至少这种方式不会阻止从输入设备读取,如果有一个瞬间锁定。

创建或查找允许您将作业分配给线程池的库,尤其是当您的处理器数量多于您可以 运行 并行处理的线程数时。实现某种允许作业之间存在一些简单关系的作业队列也相对简单(例如“此作业要求作业 X 和 Y 已先完成”,“此作业不能 运行 与任何作业并行使用同一处理器的其他作业”)。即使是作业管理器仅 运行 在第一个可用线程上第一个 运行 可用作业的简单策略也可能非常有效。

尽量避免抄袭。如果处理器可以处理数据包 'in-place' 而无需从缓冲区复制它们,那么您就节省了很多无意义的周期。即使您确实必须复制,让多个线程从 'read only' 共享缓冲区复制数据也比让单个线程复制消息并将消息分派给多个线程要好。

如果检查给定数据包的处理器是否应该 运行 非常快,那么您最好有多个作业,每个作业检查它是否应该进行一些处理。与其让一个线程确定哪些处理器应该 运行 处理哪些数据包,不如让多个线程更快,一个用于每个处理器或一组处理器,检查每个数据包一次它的处理器是否应该 [=74] =].这只是因为在多个线程中对只读资源进行多次简单检查可能比在线程之间进行同步花费的时间更少。

如果您可以 运行 处理器并行处理来自不同流的数据,那么传递数据以获取流列表,然后为每个流启动一个作业是好主意。您还可以收集属于每个流的数据包列表,但同样,这是作业检查每个数据包的速度与在单线程中收集该列表并将每个数据包传递给所需时间之间的权衡他们各自的工作。

希望其中一些策略对您有用!让我们知道它是如何工作的……你必须处理大量的数据,而且很高兴知道什么对比我习惯的更快的数据速率有效和无效!祝你好运!

为什么不能使用多个队列,每个处理器一个? 这些队列可以是无锁的(没有互斥量)。

  1. 从网络设备中获取数据包
  2. 识别每个数据包的处理器 (PID)
  3. 将数据包推送到队列[PID]
  4. 一个工人:处理队列中的数据包[k]

对于类似的问题,我使用了无锁环形缓冲区轮询,自动覆盖最旧的数据包。