处理SQS项队列的多线程方法

Multithreaded approach to process SQS item Queue

在这个场景中,我必须从队列中轮询 AWS SQS 消息,每个异步请求最多可以获取 10 个 sqs items/messages。一旦我轮询了这些项目,我就必须在 kubernetes pod 上处理这些项目。项目处理包括从几个 API 调用中获得响应,这可能需要一些时间,然后将项目保存到 DB 和 S3。 我做了一些研发并得出以下结论

  1. 要使用消费者生产者模型,1 个线程将轮询项目,另一个线程将处理项目或使用多线程处理项目
  2. 维护一个数据结构,该结构将包含准备好处理的 sqs 轮询项目,DS 可以是阻塞收集或并发队列
  3. 在线程池和项目处理中使用任务并行库。
  4. 频道可以使用

我的查询

  1. 实现最佳性能或提高 TPS 的最佳方法是什么。
  2. Can/Should我用数据流TPL
  3. 多线程或带异步任务的单线程

我不熟悉 Kubernetes,但在最大化吞吐量时需要考虑很多事情。

您提到的所有内容都是 IO 绑定的,而不是 CPU 绑定的。因此,使用 TPL 会使边际收益的设计过于复杂。参见:https://docs.microsoft.com/en-us/dotnet/csharp/async#recognize-cpu-bound-and-io-bound-work

您的 Kubernetes pods 可能存在网络限制。例如,Azure Function Apps on Consumption Plans 限制为 1,200 个出站连接。其他服务也将有一些明确的限制。 https://docs.microsoft.com/en-us/azure/azure-functions/manage-connections?tabs=csharp#connection-limit。由于您的工作性质,您很可能会在需要在多线程上处理 IO 工作之前达到这些限制。

您可能还需要考虑您所依赖的服务的限制,并确保它们能够处理吞吐量。

您可能需要考虑使用信号量来限制活动连接数以满足您的基础设施和外部依赖限制https://docs.microsoft.com/en-us/dotnet/api/system.threading.semaphoreslim?view=net-5.0

也就是说,每秒 500 条消息是一个现实的数量。为了进一步改进它,您可以考虑使用具有独立资源限制的多个进程来处理队列。

这在很大程度上取决于您的用例的具体情况以及您想投入多少努力。

但是,我将解释在做出这样的决定时我会使用的思考过程。

处理 SQS 消息的天真的解决方案是按顺序一次处理一个(即没有并发)。这并不意味着您一次只能发送一条消息,因为您可以向集群添加更多 pods。

因此,即使在那种幼稚的解决方案中,您也有一个可以利用的并发点,但它有很多开销。减少开销的方法通常是利用相同的开销,但用它处理更多的消息。这就是为什么,例如,SQS 允许您在一次调用中获得 1-10 条消息,而不仅仅是一条消息。它将调用开销分散到 10 条消息上。在天真的解决方案中,开销是启动整个过程的成本。使用处理 更多 条消息意味着并发处理。

我发现为了稳定和灵活的并发,您需要许多并发点,但每个并发点都限制在某个可配置的并行度(无论是硬编码还是实际配置)。这样你就可以调整它们中的每一个以获得最佳输出(当你有空闲 CPU 和内存时增加,否则减少)。

那么,额外的并发可以从哪里引入呢?这是一个进步,其中每一步都更好地利用资源,但需要更多的努力。

  • 每个 SQS API 调用获取 10 条消息而不是一条消息并同时处理它们。这样你就可以控制 2 个并发点:pods 的数量,并发消息的数量(最多 10 个)。
  • 有几个任务,每个任务获取 1-10 个任务并同时处理它们。这是 3 个并发点:Pods、任务和每个任务的消息。这两种解决方案 都受到 处理时间不同的消息的影响,这意味着单个长 运行 消息将“阻止”所有其他 1-9 个“时隙”的工作,从而有效减少并发性低于配置。
  • 设置一个 TPL 数据流块以并发处理消息,并设置一个(或几个)任务连续获取消息并泵入该块。请记住,需要显式删除 SQS 消息,因此该块也需要接收消息句柄,以便在处理后删除消息。
  • TPL 数据流“管道”由几个块组成,每个块都有自己的并发度。当您有不同的消息处理步骤且每个步骤都有不同的限制(例如,不同的 API 具有不同的限制配置)时,这很有用。

我个人非常喜欢 Dataflow 库,并且使用起来很舒服,所以我会直接使用它。但当性能不是问题时,更简单的解决方案也是有效的。

不熟悉您的用例,或者不熟悉您正在使用的技术,但这听起来像是一个非常常见的消息处理场景。

几条准则:

  • 首先,这些是指导方针,您的用例可能与此处评论的人习惯的用例大不相同。
  • 每当您想增加吞吐量时,您都需要确定 你的瓶颈 ,并在 CPU 瓶颈中茁壮成长,确保你 充分利用它。 CPU 负载通常是最昂贵的,并且 通常可以为自动缩放提供更可靠的指标。显然,根据您的远程 api 调用和您的数据库,您可能会遇到其他瓶颈 - SQS 队列大小也是一个很好的自动缩放指标,但请记住,如果您的瓶颈是,自动缩放不能保证增加您的吞吐量数据库或 API 相关。
  • 我不会寻求具有复杂数据结构的奇特解决方案,同样,不熟悉您的用例,所以我可能是错的 - 但请保持简单。应该有一个线程负责轮询队列,当它发现新消息时,它应该创建一个任务来处理一批。每个处理批次通常应该有一个任务 - 让 ThreadPool 处理线程数。
  • 不熟悉 .net SQS 库。但是,我熟悉其他库的非常相似的解决方案。大多数用于排队的图书馆已经为您做好了这一切,您真的不必担心。当高度优化的库已经找到新消息时,您可能应该只调用一个回调函数。这些库可能已经为每个批次创建了一个新任务 - 您只需要注册到他们的回调,并确保您 await 任何 I/O 绑定代码。

编辑:我提出的解决方案确实有一个限制,即单个消息可以阻止整个批次,这不一定是坏事 - 如果您的解决方案需要对不同的消息进行不同的处理,而您不需要想要创建此内部批处理依赖项,TPL DataFlow 绝对是您用例的一个很好的解决方案。

是的,这听起来很像 TPL Dataflow 的任务,它是一种用途广泛但功能强大的工具。您的第一个链 link 将从队列中获取消息(不一定是单线程的,您只需传递一些委托)。您还可以通过这种方式控制本地“排队”的项目数量。

然后你可以以任何你想要的方式“订阅”你的工作人员——你甚至可以自定义它,这样“错误”的处理就会被放回你的队列中——即使你的处理是 IO 绑定的也没有关系或不。如果是——好吧,TPL 数据流是异步的,如果不是——好吧,没问题,TPL 数据流也可以是同步的。或者你可以启动一些线程池线程,没什么大不了的。