单一生产者,多个消费者,有一些不寻常的曲折
Single Producer, Multiple Consumers with a few unusual twists
我有一个 (Posix) 服务器,它充当许多客户端到另一个上游服务器的代理。消息通常从上游服务器向下流动,然后进行匹配,并推送到对该流量感兴趣的某些客户端子集(维护来自上游服务器的 FIFO 顺序)。目前,这个代理服务器是单线程的,使用事件循环(例如 - select、epoll 等),但现在我想让它成为多线程的,这样代理可以更充分地利用整个机器并实现更高的吞吐量。
我的高级设计是拥有一个包含 N 个工作线程的池(其中 N 是机器上核心数量的某个小倍数),每个线程 运行 都有自己的事件循环。每个客户端连接都将分配给一个特定的工作线程,然后该工作线程将负责为该客户端连接期间的所有 I/O + 超时需求提供服务。我还打算有一个专用线程从上游服务器中提取消息。一旦一条消息被读入,它的内容可以被认为是不变的/不变的,直到不再需要和回收它为止。工作人员从不更改消息内容——他们只是根据需要将它们传递给他们的客户。
我的第一个问题是:客户利益的匹配应该由producer线程还是worker线程来做?
在前一种方法中,对于每个工作者线程,生产者可以检查工作者客户的兴趣(例如 - 组成员资格)。如果消息与任何客户端匹配,则它可以将消息推送到该工作人员的专用队列中。这种方法需要生产者和每个工人之间就其客户很少改变的兴趣进行某种同步。
在后一种方法中,生产者只是将每条消息推送到某种由所有工作线程共享的队列中。然后,每个工作线程检查 ALL 消息是否符合其客户的兴趣,并处理每条匹配的消息。这是对通常的 SPMC 问题的扭曲,在该问题中,消费者通常被假设为单方面 为自己获取 一个元素,而不是所有消费者都需要对每个元素进行一些处理。这种方法将匹配工作分配给多个线程,这似乎是可取的,但我担心 可能 导致线程之间发生更多争用,具体取决于我们如何实现它们的同步。
在这两种方法中,当任何工作线程不再需要一条消息时,就需要回收它。因此,需要进行一些跟踪以了解何时不再有工作线程需要消息。
我的第二个问题是:跟踪任何工作线程是否仍需要消息的好方法是什么?
执行此操作的一种简单方法是为每条消息分配一个计数,即在首次生成消息时仍需要处理该消息的工作线程数。然后,当每个工作人员完成一条消息的处理后,它将以线程安全的方式减少计数,并且 if/when 计数变为零,我们知道它可以被回收。
另一种方法是在消息传入时为其分配 64b 序列号,然后每个线程都可以跟踪并记录它们以某种方式处理过的最高序列号。然后我们可以以某种方式回收所有工作线程中序列号小于或等于最小处理序列号的所有消息。
后一种方法似乎可以更轻松地允许懒惰的回收过程,减少跨线程同步的必要性。也就是说,您可以有一个 "clean-up" 线程,该线程仅定期 运行 运行并计算工作线程中的最小值,而需要的线程间同步要少得多。例如,如果我们假设一个 64b 整数的读写是原子的,并且一个 worker 的完全处理的序列号总是单调递增,那么 "clean-up" 线程可以周期性地读取 worker 的完全处理的计数(可能有一些内存屏障)并计算最小值。
第三个问题:让工作人员意识到他们的队列中有新工作要做的最佳方式是什么?
每个工作线程都将管理自己的客户端文件描述符和超时事件循环。每个工作线程是否最好只拥有自己的管道,生产者可以将信号数据写入该管道以促使它们采取行动?还是他们应该定期检查他们的队列中是否有新工作?有更好的方法吗?
最后一个问题:生产者和消费者之间的队列应该使用什么样的数据结构和同步?
我知道无锁数据结构,但我不太清楚它们在我的情况下是否更可取,或者我是否应该只使用一个简单的互斥锁来处理影响队列。此外,在共享队列方法中,我不完全确定工作线程应该如何跟踪 "where" 它在处理队列时的状态。
任何见解将不胜感激!谢谢!
根据您的问题描述,无论如何都需要为每个客户对每个消息进行客户兴趣匹配,因此匹配工作无论发生在哪种类型的线程中都是相同的。这表明应该进行匹配在客户端线程中以提高并发性。如果 "producer" 线程确保消息在其他线程知道它们的可用性之前被刷新到主内存(技术上,"synchronize memory with respect to other threads"),那么同步开销应该不是一个主要问题,因为客户端线程可以全部同时从主存中读取信息,彼此不同步。客户端线程将无法修改消息,但他们不需要这样做。
消息回收最好通过跟踪每个线程的当前消息数而不是使用消息特定计数器来完成,因为消息特定计数器会带来并发瓶颈。
我认为您不需要正式的排队机制。 "producer" 线程可以简单地更新一个 volatile 变量,该变量包含已刷新到主内存的最新消息的编号,客户端线程可以在空闲时检查该变量,如果没有则休眠工作可用。您可以在线程管理上变得更复杂,但额外的效率提升可能很小。
我认为您不需要复杂的数据结构。您需要可变变量来表示可用于处理的最新消息的数量以及每个客户端线程已处理的最新消息的数量。您需要将消息本身刷新到主内存。您需要某种方法从消息编号找到主内存中的消息,可能使用指针的循环缓冲区,或者如果消息的长度都相同则使用消息。关于要在线程之间通信的数据,您实际上并不需要太多其他东西。
我有一个 (Posix) 服务器,它充当许多客户端到另一个上游服务器的代理。消息通常从上游服务器向下流动,然后进行匹配,并推送到对该流量感兴趣的某些客户端子集(维护来自上游服务器的 FIFO 顺序)。目前,这个代理服务器是单线程的,使用事件循环(例如 - select、epoll 等),但现在我想让它成为多线程的,这样代理可以更充分地利用整个机器并实现更高的吞吐量。
我的高级设计是拥有一个包含 N 个工作线程的池(其中 N 是机器上核心数量的某个小倍数),每个线程 运行 都有自己的事件循环。每个客户端连接都将分配给一个特定的工作线程,然后该工作线程将负责为该客户端连接期间的所有 I/O + 超时需求提供服务。我还打算有一个专用线程从上游服务器中提取消息。一旦一条消息被读入,它的内容可以被认为是不变的/不变的,直到不再需要和回收它为止。工作人员从不更改消息内容——他们只是根据需要将它们传递给他们的客户。
我的第一个问题是:客户利益的匹配应该由producer线程还是worker线程来做?
在前一种方法中,对于每个工作者线程,生产者可以检查工作者客户的兴趣(例如 - 组成员资格)。如果消息与任何客户端匹配,则它可以将消息推送到该工作人员的专用队列中。这种方法需要生产者和每个工人之间就其客户很少改变的兴趣进行某种同步。
在后一种方法中,生产者只是将每条消息推送到某种由所有工作线程共享的队列中。然后,每个工作线程检查 ALL 消息是否符合其客户的兴趣,并处理每条匹配的消息。这是对通常的 SPMC 问题的扭曲,在该问题中,消费者通常被假设为单方面 为自己获取 一个元素,而不是所有消费者都需要对每个元素进行一些处理。这种方法将匹配工作分配给多个线程,这似乎是可取的,但我担心 可能 导致线程之间发生更多争用,具体取决于我们如何实现它们的同步。
在这两种方法中,当任何工作线程不再需要一条消息时,就需要回收它。因此,需要进行一些跟踪以了解何时不再有工作线程需要消息。
我的第二个问题是:跟踪任何工作线程是否仍需要消息的好方法是什么?
执行此操作的一种简单方法是为每条消息分配一个计数,即在首次生成消息时仍需要处理该消息的工作线程数。然后,当每个工作人员完成一条消息的处理后,它将以线程安全的方式减少计数,并且 if/when 计数变为零,我们知道它可以被回收。
另一种方法是在消息传入时为其分配 64b 序列号,然后每个线程都可以跟踪并记录它们以某种方式处理过的最高序列号。然后我们可以以某种方式回收所有工作线程中序列号小于或等于最小处理序列号的所有消息。
后一种方法似乎可以更轻松地允许懒惰的回收过程,减少跨线程同步的必要性。也就是说,您可以有一个 "clean-up" 线程,该线程仅定期 运行 运行并计算工作线程中的最小值,而需要的线程间同步要少得多。例如,如果我们假设一个 64b 整数的读写是原子的,并且一个 worker 的完全处理的序列号总是单调递增,那么 "clean-up" 线程可以周期性地读取 worker 的完全处理的计数(可能有一些内存屏障)并计算最小值。
第三个问题:让工作人员意识到他们的队列中有新工作要做的最佳方式是什么?
每个工作线程都将管理自己的客户端文件描述符和超时事件循环。每个工作线程是否最好只拥有自己的管道,生产者可以将信号数据写入该管道以促使它们采取行动?还是他们应该定期检查他们的队列中是否有新工作?有更好的方法吗?
最后一个问题:生产者和消费者之间的队列应该使用什么样的数据结构和同步?
我知道无锁数据结构,但我不太清楚它们在我的情况下是否更可取,或者我是否应该只使用一个简单的互斥锁来处理影响队列。此外,在共享队列方法中,我不完全确定工作线程应该如何跟踪 "where" 它在处理队列时的状态。
任何见解将不胜感激!谢谢!
根据您的问题描述,无论如何都需要为每个客户对每个消息进行客户兴趣匹配,因此匹配工作无论发生在哪种类型的线程中都是相同的。这表明应该进行匹配在客户端线程中以提高并发性。如果 "producer" 线程确保消息在其他线程知道它们的可用性之前被刷新到主内存(技术上,"synchronize memory with respect to other threads"),那么同步开销应该不是一个主要问题,因为客户端线程可以全部同时从主存中读取信息,彼此不同步。客户端线程将无法修改消息,但他们不需要这样做。
消息回收最好通过跟踪每个线程的当前消息数而不是使用消息特定计数器来完成,因为消息特定计数器会带来并发瓶颈。
我认为您不需要正式的排队机制。 "producer" 线程可以简单地更新一个 volatile 变量,该变量包含已刷新到主内存的最新消息的编号,客户端线程可以在空闲时检查该变量,如果没有则休眠工作可用。您可以在线程管理上变得更复杂,但额外的效率提升可能很小。
我认为您不需要复杂的数据结构。您需要可变变量来表示可用于处理的最新消息的数量以及每个客户端线程已处理的最新消息的数量。您需要将消息本身刷新到主内存。您需要某种方法从消息编号找到主内存中的消息,可能使用指针的循环缓冲区,或者如果消息的长度都相同则使用消息。关于要在线程之间通信的数据,您实际上并不需要太多其他东西。