如何从多个线程写入 TcpListener?

How to write from multiple threads to a TcpListener?

假设我有一个静态列表 List<string> dataQueue,其中数据不断以随机间隔和变化的速率添加 (1-1000 entries/second).

我的主要objective是将列表中的数据发送到服务器,我使用的是TcpClientclass.

到目前为止我所做的是,我在单线程中将数据同步发送到客户端

byte[] bytes = Encoding.ASCII.GetBytes(message);

tcpClient.GetStream().Write(bytes, 0, bytes.Length);
//The client is already connected at the start

发送数据后,我会从列表中删除该条目。

这工作正常,但发送数据的速度不够快,列表被填充并消耗更多内存,因为列表被迭代并一个接一个地发送。

我的问题是我可以使用同一个 tcpClient 对象从另一个线程并发写入,还是可以使用另一个 tcpClient 对象在另一个线程中与同一服务器建立新连接?将此数据发送到服务器的最有效(最快)方式是什么?

PS: 我不想使用 UDP

对;这是一个有趣的话题,我想我可以发表意见。听起来您在多个线程之间共享一个套接字 - 完全有效 只要您 非常小心地做到这一点。 TCP 套接字是逻辑字节流,因此您不能并发 这样使用它,但如果您的代码足够快,您可以非常有效地共享套接字,每条消息连续.

可能首先要看的是:您实际上是如何将数据写入套接字的?您的 framing/encoding 代码是什么样的?如果此代码只是 bad/inefficient:它可能会得到改进。例如,它是否通过天真的 Encode 调用为每个 string 间接创建了一个新的 byte[]?是否涉及多个缓冲区?取景时是否多次调用Send?它是如何解决数据包碎片问题的?等等

作为首先尝试的事情 - 您可以避免一些缓冲区分配:

var enc = Encoding.ASCII;
byte[] bytes = ArrayPool<byte>.Shared.Rent(enc.GetMaxByteCount(message.Length));
// note: leased buffers can be oversized; and in general, GetMaxByteCount will
// also be oversized; so it is *very* important to track how many bytes you've used
int byteCount = enc.GetBytes(message, 0, message.Length, bytes, 0);
tcpClient.GetStream().Write(bytes, 0, byteCount);
ArrayPool<byte>.Shared.Return(bytes);

这使用租用缓冲区来避免每次都创建 byte[] - 这可以极大地改善 GC 影响。如果是我,我也可能会使用原始 Socket 而不是 TcpClientStream 抽象,坦率地说,这不会给你带来很多好处。注意:如果您有其他框架要做:将其包含在您租用的缓冲区的大小中,在编写每个片段时使用适当的偏移量,并且只写 一次 - 即准备整个缓冲区一次- 避免多次调用 Send.


现在,听起来你有一个队列和专门的作家;也就是说,您的 app 代码附加到队列中,而您的 writer 代码使事物出队并将它们写入套接字。这是一种合理的实现方式,尽管我会添加一些注释:

  • List<T> 是一种 糟糕的 实现队列的方式——从一开始就删除一些东西需要重新洗牌其他所有东西(这很昂贵);如果可能,更喜欢Queue<T>,它是为您的场景完美实现的
  • 它将需要同步,这意味着您需要确保一次只有一个线程更改队列 - 这通常通过简单的 lock 完成,即 lock(queue) {queue.Enqueue(newItem);}SomeItem next; lock(queue) { next = queue.Count == 0 ? null : queue.Dequeue(); } if (next != null) {...write it...}.

这种方法简单,并且在避免数据包碎片方面有一些优势 - 编写器可以使用暂存缓冲区,并且只有在达到特定阈值时才实际写入套接字被缓冲,或者当队列为空时,例如 - 但它有可能在发生停顿时产生大量积压。

但是!发生积压的事实表明 某事 没有跟上;这可能是 网络 (带宽)、远程服务器 (CPU) - 或者可能是本地出站网络硬件。如果这仅发生在随后自行解决的小事件中 - 很好(尤其是当某些出站消息很大时发生这种情况),但是:值得关注。

如果这种积压反复出现,那么坦率地说,您需要考虑您对当前设计的简单饱和,因此您需要疏通其中一个关键点:

  • 确保您的编码代码高效是第零步
  • 您可以将 encode 步骤移动到应用程序代码中,即在 获取锁之前准备一个帧,对消息进行编码,并且只排队一个完全准备好的框架;这意味着 writer 线程除了出列、写入、回收之外不需要做任何事情 - 但它使缓冲区管理更加复杂(显然你不能回收缓冲区,直到它们被完全处理)
  • 减少数据包碎片可能会有很大帮助,如果您还没有采取措施来实现的话
  • 否则,您可能需要(调查堵塞后):
    • 更好的本地网络硬件 (NIC) 或物理机硬件(CPU 等)
    • 多个套接字(和queues/workers)之间循环,分配负载
    • 可能是多个服务器进程,每个服务器有一个端口,因此您的多个套接字正在与不同的进程通信
    • 更好的服务器
    • 多个服务器

注意:在涉及多个套接字的任何场景中,您要小心不要发疯并拥有太多个专用工作线程;如果这个数字超过,比如说,10 个线程,你 可能 想考虑其他选择 - 可能涉及异步 IO and/or 管道(如下)。


为了完整起见,另一种基本方法是从应用程序代码编写;这种方法更简单,并且避免了未发送工作的积压,但是:这意味着现在您的应用程序代码线程 自身 将在负载下备份。如果您的应用程序代码线程实际上是工作线程,并且它们在 sync/lock 上被阻塞,那么这可能 真的 很糟糕;你 想要使线程池饱和,因为你最终可能会遇到没有可用线程池线程来满足 解除阻塞所需的 IO 工作的情况 无论哪个作家活跃,这都会让您陷入 真正的 问题。这通常不是您想要用于高 load/volume 的方案,因为它很快就会出现问题 - 并且很难避免数据包碎片,因为每个单独的消息都无法知道是否有更多消息即将发送进来


最近要考虑的另一个选择是 "pipelines";这是 .NET 中的一个新的 IO 框架,专为高容量网络而设计,特别关注异步 IO、缓冲区重用和实现良好的 buffer/back-log 机制,可以使用简单的写入器方法(写入时同步)并且不将其转化为直接发送 - 它表现为一个异步写入器,可以访问积压,这使得避免数据包碎片变得简单而高效。这是一个相当高级的领域,但它可能非常有效。对你来说有问题的部分是:它是为整个异步使用而设计的,甚至是写——所以如果你的应用程序代码当前是同步的,这可能很难实现。但是:这是一个需要考虑的领域。我有很多博客文章讨论这个主题,还有一系列 OSS 示例和现实生活中使用管道的库,我可以向您指出,但是:这不是 "quick fix" - 它是彻底改革整个 IO 层。它也不是灵丹妙药 - 它只能消除由于本地 IO 处理成本而产生的开销。