Block/Re-queue 当现有的 sidekiq 作业正在处理特定资源时处理其他 sidekiq 作业
Block/Re-queue other sidekiq jobs from processing when existing sidekiq job is processing a particular resource
我有 sidekiq 作业在处理多种类型的资源。但是,对于特定类型的资源,例如:资源 X,我需要确保在任何给定时间只有一个 sidekiq 作业可以处理该特定资源。
例如,如果我有 3 个 sidekiq 作业同时排队并想与资源 X 交互,则只有 1 个 sidekiq 作业可以处理资源 X,而其余 2 个 sidekiq 作业将不得不等待(或重新排队)直到当前正在处理资源的 sidekiq 作业完成。
目前,我正在尝试在数据库中添加一条记录 table 以记录 sidekiq 作业何时处理资源,并使用它来阻止其他 sidekiq 作业处理资源,直到该记录从数据库中删除添加它的 sidekiq 作业(当它完成处理资源 X 时)或经过一定时间后(例如:如果记录是在 5 分钟前创建的,则认为它不再拥有对资源的独占访问权) X 和下一个想要处理资源 X 的 sidekiq 作业可能会更改该记录并要求独占访问资源 X)。
我当前实现的伪代码:
def perform(res_id, res_type)
# Only applies to "RESOURCE_X"
if res_type == RESOURCE_X
if ResourceProcessor.where(res_id).empty? || ((Time.now-ResourceProcessor.where(res_id).first.created_at) > 5.minutes)
ResourceProcessor.create(res_id: res_id).save
process_resource_x(res_id)
else
SidekiqWorker.delayed(res_id, res_type, 5.minutes) #Try again later
return
end
#Letting other sidekiq jobs know they can now fight over who gets to process resource X
ResourceProcessor.where(res_id).destroy
else
process_other_resource(res_id)
end
end
不幸的是,我的解决方案不起作用。如果想要处理资源 X 的 sidekiq 作业之间存在延迟,它就可以正常工作。但是,如果想要处理资源 X 的作业同时到达,那么我的解决方案就会崩溃。
有什么方法可以仅在处理资源 X 时强制执行某种同步吗?
顺便说一句,我的 sidekiq 作业可能分布在多台机器上(但它们在专用机器上访问同一个 redis 服务器)。
我根据 Thomas 提供的评论做了更多研究。
他提供的 link 非常有用。他们实现了自己的自定义 Lock class 来实现他们想要的结果。但是,我没有使用他们的自定义锁码,因为我需要不同的行为。
我希望实现的具体行为是 "Re-queue if locked" 而不是 "Wait if lock"。
我可以使用其他工具,例如 redis-semaphore and with_advisory_gem。
我测试了 redis-semaphore 并发现它有问题。它没有正确返回锁定状态和资源计数。另外,在检查了 Github 上的问题后,在某些情况下,redis-semaphore 可能会自己陷入死锁,所以我决定放弃使用它。因此,我也决定不使用 with_advisory_gem,因为它的星数低于 redis-semaphore。
最后我找到了一种方法来实现我在问题中描述的锁定模式,即根据数据库中的值阻止 sidekiq 作业。我通过使用 rail 自己的 Locking-pessimistic class 锁定整个数据库行来处理多个 sidekiq 作业读取陈旧值的并发问题。这确保了在任何给定时间只有 1 个 sidekiq worker 可以访问保存锁定值的数据库行。锁定期保持在最低限度,因为只有读取和适用时,在锁定数据库行时执行写入操作。之后进行重新排队和清理等后续操作。
我有 sidekiq 作业在处理多种类型的资源。但是,对于特定类型的资源,例如:资源 X,我需要确保在任何给定时间只有一个 sidekiq 作业可以处理该特定资源。
例如,如果我有 3 个 sidekiq 作业同时排队并想与资源 X 交互,则只有 1 个 sidekiq 作业可以处理资源 X,而其余 2 个 sidekiq 作业将不得不等待(或重新排队)直到当前正在处理资源的 sidekiq 作业完成。
目前,我正在尝试在数据库中添加一条记录 table 以记录 sidekiq 作业何时处理资源,并使用它来阻止其他 sidekiq 作业处理资源,直到该记录从数据库中删除添加它的 sidekiq 作业(当它完成处理资源 X 时)或经过一定时间后(例如:如果记录是在 5 分钟前创建的,则认为它不再拥有对资源的独占访问权) X 和下一个想要处理资源 X 的 sidekiq 作业可能会更改该记录并要求独占访问资源 X)。
我当前实现的伪代码:
def perform(res_id, res_type)
# Only applies to "RESOURCE_X"
if res_type == RESOURCE_X
if ResourceProcessor.where(res_id).empty? || ((Time.now-ResourceProcessor.where(res_id).first.created_at) > 5.minutes)
ResourceProcessor.create(res_id: res_id).save
process_resource_x(res_id)
else
SidekiqWorker.delayed(res_id, res_type, 5.minutes) #Try again later
return
end
#Letting other sidekiq jobs know they can now fight over who gets to process resource X
ResourceProcessor.where(res_id).destroy
else
process_other_resource(res_id)
end
end
不幸的是,我的解决方案不起作用。如果想要处理资源 X 的 sidekiq 作业之间存在延迟,它就可以正常工作。但是,如果想要处理资源 X 的作业同时到达,那么我的解决方案就会崩溃。
有什么方法可以仅在处理资源 X 时强制执行某种同步吗?
顺便说一句,我的 sidekiq 作业可能分布在多台机器上(但它们在专用机器上访问同一个 redis 服务器)。
我根据 Thomas 提供的评论做了更多研究。
他提供的 link 非常有用。他们实现了自己的自定义 Lock class 来实现他们想要的结果。但是,我没有使用他们的自定义锁码,因为我需要不同的行为。
我希望实现的具体行为是 "Re-queue if locked" 而不是 "Wait if lock"。
我可以使用其他工具,例如 redis-semaphore and with_advisory_gem。 我测试了 redis-semaphore 并发现它有问题。它没有正确返回锁定状态和资源计数。另外,在检查了 Github 上的问题后,在某些情况下,redis-semaphore 可能会自己陷入死锁,所以我决定放弃使用它。因此,我也决定不使用 with_advisory_gem,因为它的星数低于 redis-semaphore。
最后我找到了一种方法来实现我在问题中描述的锁定模式,即根据数据库中的值阻止 sidekiq 作业。我通过使用 rail 自己的 Locking-pessimistic class 锁定整个数据库行来处理多个 sidekiq 作业读取陈旧值的并发问题。这确保了在任何给定时间只有 1 个 sidekiq worker 可以访问保存锁定值的数据库行。锁定期保持在最低限度,因为只有读取和适用时,在锁定数据库行时执行写入操作。之后进行重新排队和清理等后续操作。