如何更改这些生产者-消费者微服务以允许并行处理?
How do I change these producer-consumer microservices to allow parallel processing?
我有几个微服务(在 ruby 中实现,尽管我怀疑这对我的问题是否重要)。其中一个提供项目,另一个处理它们,然后将它们标记为已处理(通过 DELETE 调用)
提供者有一个 /items
端点,它以 JSON 格式列出了一堆用 id 标识的项目。它还有一个 DELETE /items/id
端点,用于从列表中删除一项(大概是因为它已被处理)
"processor" 中的代码(非常简化)如下所示:
items = <GET provider/items>
items.each do |item|
process item
<DELETE provider/items/#{item.id}>
end
这有几个问题,但我想解决的一个问题是它不是线程安全的,因此我无法并行运行它。如果两个工作人员同时开始处理项目,他们将"step onto each other's toes":他们将获得相同的项目列表,然后(尝试)处理并删除每个项目两次。
更改此设置以允许并行处理的最简单方法是什么?
您可以假设我有 ruby 可用。我宁愿将更改保持在最低限度,并且如果可能的话宁愿不安装其他 gem。 Sidekiq 可作为消费者上的排队系统使用。
一些备选方案(只是集思广益):
- 只需放弃 HTTP 并使用带有队列的发布-订阅。让生产者排队项目,一些消费者处理它们(并触发状态变化,如果你喜欢的话,在这种情况下使用 HTTP)。
如果您真的想要 HTTP,我认为还有一些缺失。如果您的项目的状态是 pending 和 processed,则您的状态机中有一个 hidden/implicit 状态:in_progress(或其他)。一旦您想到它,情况就会变得更加清晰:您的 GET /items
不是幂等的(因为它将项目的状态从待处理更改为进行中)因此首先不应该是 GET。
一个。另一种方法是添加一个通过 POST 创建的新实体(例如批处理),并将一些项目分组并发送。已经 return 的项目将不会成为未来批次的一部分,然后您可以将整个批次标记为已完成(例如 PUT /batches/X/done
)。这很快就会变得疯狂,因为您将开始重新实现排队系统和 plain/explicit(参见 c)HTTP 中已经存在的功能(确认、超时、错误)。
b。一个稍微简单的替代方案:只需在 POST
/PUT
(两种情况下都很奇怪)端点中将 /items
标记为正在处理(并且不再 return 它们因为它只有 returns 未决项目)。但是,同样的错误和超时问题也适用。
c。让生产者明确并通过 PUT 向其他服务请求处理项目。您可以在正文中包含所有需要的数据,或者将其用作 ping 并让处理器通过 GET 请求信息。您可以在任一侧添加异步处理(但在处理器中可能更好)。
我会诚实地做 1(除非有令人信服的理由)。
What is the simplest way I can change this setup to allow for parallel processing?
如果可以在服务端升级代码,或者增加中间人代码,那么最简单的方式就是队列。
如果您只喜欢客户端,没有中间人,也没有客户与客户之间的谈话,偶尔冗余也可以,那么这里有一些想法。
使用随机播放减少冲突
- 如果您的服务器可以接收一个不存在的对象的 DELETE
- 而且"process item"成本+时间比较少
- 并且流程与顺序无关
然后你可以洗牌以减少碰撞:
items.shuffle.each do |item|
process item
使用 HEAD
检查项目是否存在
- 如果你的服务器有 HEAD 方法
- 并且有办法查找一项
- 与 "process item"
相比,HTTP 连接更便宜+更快
如果该项目不存在则可以跳过:
items.each do |item|
next if !<HEAD provider/items/id>
使用轮询循环刷新项目
- 如果这些项目类似于您轮询正在进行的工作池
- 并且与顺序无关
- 并且 GET 请求是幂等的,即可以多次请求所有项目
- 并且 DELETE 请求 returns 通知您该项目不存在的结果
然后您可以处理项目直到遇到冗余,然后刷新项目列表:
loop do
items = <GET provider/items>
if items.blank?
sleep 1
next
end
items.each do |item|
process item
<DELETE provider/items/#{item.id}>
break if DELETE returns a code that indicates "already deleted"
end
end
以上所有结合使用轮询循环、洗牌和 HEAD 检查。
- 这是非常有效的,因为没有队列,也没有中间人,也没有客户对客户的谈话。
当多个客户端检查某个项目是否存在然后开始处理它时,仍然会出现罕见的冗余 "process item";实际上,这几乎是零概率,尤其是当有很多项目时。
loop do
items = <GET provider/items>
if items.blank?
sleep 1
next
end
items.shuffle do |item|
break if !<HEAD provider/items/id>
process item
<DELETE provider/items/#{item.id}>
break if DELETE returns a code that indicates "already deleted"
end
end
在我看来,问题在于并行化此实现,您认为每个线程都将调用:
<GET provider/items>
一种解决方案是先获取所有项目然后进行异步处理。
我的 Ruby 不存在,但它可能看起来像这样:
class HardWorker
include Sidekiq::Worker
def perform(item)
process item
<DELETE provider/items/#{item.id}>
end
end
items = <GET provider/items>
items.each do |item|
HardWorker.perform_async(item)
end
这样你的 "producer" 就是循环,消费者就是异步 HardWorker
。
我有几个微服务(在 ruby 中实现,尽管我怀疑这对我的问题是否重要)。其中一个提供项目,另一个处理它们,然后将它们标记为已处理(通过 DELETE 调用)
提供者有一个 /items
端点,它以 JSON 格式列出了一堆用 id 标识的项目。它还有一个 DELETE /items/id
端点,用于从列表中删除一项(大概是因为它已被处理)
"processor" 中的代码(非常简化)如下所示:
items = <GET provider/items>
items.each do |item|
process item
<DELETE provider/items/#{item.id}>
end
这有几个问题,但我想解决的一个问题是它不是线程安全的,因此我无法并行运行它。如果两个工作人员同时开始处理项目,他们将"step onto each other's toes":他们将获得相同的项目列表,然后(尝试)处理并删除每个项目两次。
更改此设置以允许并行处理的最简单方法是什么?
您可以假设我有 ruby 可用。我宁愿将更改保持在最低限度,并且如果可能的话宁愿不安装其他 gem。 Sidekiq 可作为消费者上的排队系统使用。
一些备选方案(只是集思广益):
- 只需放弃 HTTP 并使用带有队列的发布-订阅。让生产者排队项目,一些消费者处理它们(并触发状态变化,如果你喜欢的话,在这种情况下使用 HTTP)。
如果您真的想要 HTTP,我认为还有一些缺失。如果您的项目的状态是 pending 和 processed,则您的状态机中有一个 hidden/implicit 状态:in_progress(或其他)。一旦您想到它,情况就会变得更加清晰:您的
GET /items
不是幂等的(因为它将项目的状态从待处理更改为进行中)因此首先不应该是 GET。一个。另一种方法是添加一个通过 POST 创建的新实体(例如批处理),并将一些项目分组并发送。已经 return 的项目将不会成为未来批次的一部分,然后您可以将整个批次标记为已完成(例如
PUT /batches/X/done
)。这很快就会变得疯狂,因为您将开始重新实现排队系统和 plain/explicit(参见 c)HTTP 中已经存在的功能(确认、超时、错误)。b。一个稍微简单的替代方案:只需在
POST
/PUT
(两种情况下都很奇怪)端点中将/items
标记为正在处理(并且不再 return 它们因为它只有 returns 未决项目)。但是,同样的错误和超时问题也适用。c。让生产者明确并通过 PUT 向其他服务请求处理项目。您可以在正文中包含所有需要的数据,或者将其用作 ping 并让处理器通过 GET 请求信息。您可以在任一侧添加异步处理(但在处理器中可能更好)。
我会诚实地做 1(除非有令人信服的理由)。
What is the simplest way I can change this setup to allow for parallel processing?
如果可以在服务端升级代码,或者增加中间人代码,那么最简单的方式就是队列。
如果您只喜欢客户端,没有中间人,也没有客户与客户之间的谈话,偶尔冗余也可以,那么这里有一些想法。
使用随机播放减少冲突
- 如果您的服务器可以接收一个不存在的对象的 DELETE
- 而且"process item"成本+时间比较少
- 并且流程与顺序无关
然后你可以洗牌以减少碰撞:
items.shuffle.each do |item| process item
使用 HEAD
检查项目是否存在- 如果你的服务器有 HEAD 方法
- 并且有办法查找一项
- 与 "process item" 相比,HTTP 连接更便宜+更快
如果该项目不存在则可以跳过:
items.each do |item| next if !<HEAD provider/items/id>
使用轮询循环刷新项目
- 如果这些项目类似于您轮询正在进行的工作池
- 并且与顺序无关
- 并且 GET 请求是幂等的,即可以多次请求所有项目
- 并且 DELETE 请求 returns 通知您该项目不存在的结果
然后您可以处理项目直到遇到冗余,然后刷新项目列表:
loop do items = <GET provider/items> if items.blank? sleep 1 next end items.each do |item| process item <DELETE provider/items/#{item.id}> break if DELETE returns a code that indicates "already deleted" end end
以上所有结合使用轮询循环、洗牌和 HEAD 检查。
- 这是非常有效的,因为没有队列,也没有中间人,也没有客户对客户的谈话。
当多个客户端检查某个项目是否存在然后开始处理它时,仍然会出现罕见的冗余 "process item";实际上,这几乎是零概率,尤其是当有很多项目时。
loop do items = <GET provider/items> if items.blank? sleep 1 next end items.shuffle do |item| break if !<HEAD provider/items/id> process item <DELETE provider/items/#{item.id}> break if DELETE returns a code that indicates "already deleted" end end
在我看来,问题在于并行化此实现,您认为每个线程都将调用:
<GET provider/items>
一种解决方案是先获取所有项目然后进行异步处理。
我的 Ruby 不存在,但它可能看起来像这样:
class HardWorker
include Sidekiq::Worker
def perform(item)
process item
<DELETE provider/items/#{item.id}>
end
end
items = <GET provider/items>
items.each do |item|
HardWorker.perform_async(item)
end
这样你的 "producer" 就是循环,消费者就是异步 HardWorker
。