如何更改这些生产者-消费者微服务以允许并行处理?

How do I change these producer-consumer microservices to allow parallel processing?

我有几个微服务(在 ruby 中实现,尽管我怀疑这对我的问题是否重要)。其中一个提供项目,另一个处理它们,然后将它们标记为已处理(通过 DELETE 调用)

提供者有一个 /items 端点,它以 JSON 格式列出了一堆用 id 标识的项目。它还有一个 DELETE /items/id 端点,用于从列表中删除一项(大概是因为它已被处理)

"processor" 中的代码(非常简化)如下所示:

items = <GET provider/items>
items.each do |item|
  process item
  <DELETE provider/items/#{item.id}>
end

这有几个问题,但我想解决的一个问题是它不是线程安全的,因此我无法并行运行它。如果两个工作人员同时开始处理项目,他们将"step onto each other's toes":他们将获得相同的项目列表,然后(尝试)处理并删除每个项目两次。

更改此设置以允许并行处理的最简单方法是什么?

您可以假设我有 ruby 可用。我宁愿将更改保持在最低限度,并且如果可能的话宁愿不安装其他 gem。 Sidekiq 可作为消费者上的排队系统使用。

一些备选方案(只是集思广益):

  1. 只需放弃 HTTP 并使用带有队列的发布-订阅。让生产者排队项目,一些消费者处理它们(并触发状态变化,如果你喜欢的话,在这种情况下使用 HTTP)。
  2. 如果您真的想要 HTTP,我认为还有一些缺失。如果您的项目的状态是 pendingprocessed,则您的状态机中有一个 hidden/implicit 状态:in_progress(或其他)。一旦您想到它,情况就会变得更加清晰:您的 GET /items 不是幂等的(因为它将项目的状态从待处理更改为进行中)因此首先不应该是 GET。

    一个。另一种方法是添加一个通过 POST 创建的新实体(例如批处理),并将一些项目分组并发送。已经 return 的项目将不会成为未来批次的一部分,然后您可以将整个批次标记为已完成(例如 PUT /batches/X/done)。这很快就会变得疯狂,因为您将开始重新实现排队系统和 plain/explicit(参见 c)HTTP 中已经存在的功能(确认、超时、错误)。

    b。一个稍微简单的替代方案:只需在 POST/PUT(两种情况下都很奇怪)端点中将 /items 标记为正在处理(并且不再 return 它们因为它只有 returns 未决项目)。但是,同样的错误和超时问题也适用。

    c。让生产者明确并通过 PUT 向其他服务请求处理项目。您可以在正文中包含所有需要的数据,或者将其用作 ping 并让处理器通过 GET 请求信息。您可以在任一侧添加异步处理(但在处理器中可能更好)。

我会诚实地做 1(除非有令人信服的理由)。

What is the simplest way I can change this setup to allow for parallel processing?

如果可以在服务端升级代码,或者增加中间人代码,那么最简单的方式就是队列。

如果您只喜欢客户端,没有中间人,也没有客户与客户之间的谈话,偶尔冗余也可以,那么这里有一些想法。

  1. 使用随机播放减少冲突

    • 如果您的服务器可以接收一个不存在的对象的 DELETE
    • 而且"process item"成本+时间比较少
    • 并且流程与顺序无关
    • 然后你可以洗牌以减少碰撞:

      items.shuffle.each do |item|
        process item
      
  2. 使用 HEAD

    检查项目是否存在
    • 如果你的服务器有 HEAD 方法
    • 并且有办法查找一项
    • 与 "process item"
    • 相比,HTTP 连接更便宜+更快
    • 如果该项目不存在则可以跳过:

      items.each do |item|
        next if !<HEAD provider/items/id>
      
  3. 使用轮询循环刷新项目

    • 如果这些项目类似于您轮询正在进行的工作池
    • 并且与顺序无关
    • 并且 GET 请求是幂等的,即可以多次请求所有项目
    • 并且 DELETE 请求 returns 通知您该项目不存在的结果
    • 然后您可以处理项目直到遇到冗余,然后刷新项目列表:

      loop do
        items = <GET provider/items>
        if items.blank?
          sleep 1
          next
        end
        items.each do |item|
          process item
          <DELETE provider/items/#{item.id}>
          break if DELETE returns a code that indicates "already deleted"
        end
      end
      
  4. 以上所有结合使用轮询循环、洗牌和 HEAD 检查。

    • 这是非常有效的,因为没有队列,也没有中间人,也没有客户对客户的谈话。
    • 当多个客户端检查某个项目是否存在然后开始处理它时,仍然会出现罕见的冗余 "process item";实际上,这几乎是零概率,尤其是当有很多项目时。

      loop do
        items = <GET provider/items>
        if items.blank?
          sleep 1 
          next
        end
        items.shuffle do |item|
          break if !<HEAD provider/items/id>
          process item
          <DELETE provider/items/#{item.id}>
          break if DELETE returns a code that indicates "already deleted"
        end
      end
      

在我看来,问题在于并行化此实现,您认为每个线程都将调用:

<GET provider/items>

一种解决方案是先获取所有项目然后进行异步处理。

我的 Ruby 不存在,但它可能看起来像这样:

class HardWorker
    include Sidekiq::Worker
    def perform(item)
        process item
        <DELETE provider/items/#{item.id}>
    end
end

items = <GET provider/items>

items.each do |item|
    HardWorker.perform_async(item)
end

这样你的 "producer" 就是循环,消费者就是异步 HardWorker