在同一个 worker 中使用 Sidekiq 的 `unique_for` 和 `Sidekiq::Limiter.concurrent` 时避免重复作业
Avoiding duplicate jobs when using Sidekiq's `unique_for` and `Sidekiq::Limiter.concurrent` in the same worker
总结
我正在努力消除 Sidekiq 作业的重复数据并限制同一工作人员的并发性,同时又不引入导致工作人员错过更新的竞争条件。以下是我尝试过的解决方案以及它们不令我满意的原因。你能建议改进我的解决方案或其他方法来解决这个问题吗?
解决方案 1:使用 unique_until: :start
和 Sidekiq::Limiter.concurrent
目前,工人使用 unique_until: :start
和 Sidekiq::Limiter.concurrent
锁。
这个解决方案的缺点是这两个 Sidekiq 功能之间的交互导致队列中有许多重复的作业。以下是导致重复作业的一系列事件:
Worker.perform_async(1)
排队作业 A1
- 作业 A1 启动,释放其唯一锁,并获得并发锁
Worker.perform_async(2)
排队作业 B1
- 作业 B1 启动,释放其唯一锁,未能获得并发锁,并重新安排自身。
Worker
现在没有锁定参数 2.
Worker.perform_async(2)
将作业 B2 加入队列。我希望这是一个空操作,但它会排入另一个作业,因为我们在步骤 4 中释放了唯一锁。
Worker.perform_async(2)
将作业 B3 排队...等等。
解决方案 2:使用 unique_until: :success
和 Sidekiq::Limiter.concurrent
如果我切换到 unique_until: :success
(未指定 unique_until
时的默认行为),我可以解决重复作业问题。
这个解决方案的缺点是它打开了一个竞争条件,工作人员错过了在工作 运行 时发生的更新。
解决方案 3:用专用的 Sidekiq 进程替换限制器
如果我停止使用 Sidekiq::Limiter.concurrent
而是使用由只有一个线程的 Sidekiq 进程处理的队列,我可以解决重复作业问题并避免竞争条件。
这个解决方案的缺点是我不得不使用资源受限的硬件,所以添加第二个 Sidekiq 进程是有影响的。
解决方案 1 的代码示例
这显示了我如何使用唯一性和限制器功能:
class ExpensiveWorker
include Sidekiq::Worker
sidekiq_options unique_for: 30.minutes, unique_until: :start
EXPENSIVE_JOB_LIMITER = Sidekiq::Limiter.concurrent('expensive_job',
1,
wait_timeout: 5.seconds,
lock_timeout: 15.minutes)
def perform(id)
EXPENSIVE_JOB_LIMITER.within_limit do
Rails.logger.info "Processing #{id}..."
sleep 10
end
end
end
关于我试图解决的问题的详细信息
为了简单起见,我将把我们正在处理的数据描述为作者模型,每个模型都有很多书。我们有 RebuildAuthorImagesWorker
和 ClassifyAuthorGenreWorker
,它们都将作者 ID 作为它们唯一的参数。
这两个工作人员都对作者和作者的书籍执行 CPU 和 RAM 密集型计算。我们使用 Sidekiq::Limiter.concurrent
来确保这些工人中只有一个在任何给定时间有一份活跃的工作。我们这样做是为了避免影响我们微不足道的服务器。 (我们还有很多其他工人不需要这样限制。)
由于多个活跃用户或一个用户更新了同一作者的多本书,在短时间内对同一作者或该作者的图书进行多次更新是很常见的。我们使用 unique_for: :start
来防止 RebuildAuthorImagesWorker
被同一作者多次排队。 ClassifyAuthorGenreWorker
同上。由于与 运行 相关的系统开销,我们希望避免重复作业。这些作业是幂等的,因此重复的作业不会导致数据问题。 (每个工人一个工作排队给同一个作者是可以正常的。)
如果 RebuildAuthorImagesWorker
在作者 A 上主动 运行,然后用户 X 在 RebuildAuthorImagesWorker
作业完成之前对作者 A 进行了更新,那么我们 做 想要为作者 A 排入第二个 RebuildAuthorImagesWorker
作业,因此我们不会错过将用户 X 更新的数据合并到图像中。这就是为什么我们使用 unique_until: :start
.
一个想法:
当用户想要更改作者 A 时,我会为作者 A 排入一个预定的、唯一的 UpdateAuthorJob
,它会在 10 分钟后 更新他们的信息。这样,用户可以对作者进行大量更改,系统将在执行实际更新工作之前等待 10 分钟的冷却时间,确保您将所有更新作为一个组获得。
总结
我正在努力消除 Sidekiq 作业的重复数据并限制同一工作人员的并发性,同时又不引入导致工作人员错过更新的竞争条件。以下是我尝试过的解决方案以及它们不令我满意的原因。你能建议改进我的解决方案或其他方法来解决这个问题吗?
解决方案 1:使用 unique_until: :start
和 Sidekiq::Limiter.concurrent
目前,工人使用 unique_until: :start
和 Sidekiq::Limiter.concurrent
锁。
这个解决方案的缺点是这两个 Sidekiq 功能之间的交互导致队列中有许多重复的作业。以下是导致重复作业的一系列事件:
Worker.perform_async(1)
排队作业 A1- 作业 A1 启动,释放其唯一锁,并获得并发锁
Worker.perform_async(2)
排队作业 B1- 作业 B1 启动,释放其唯一锁,未能获得并发锁,并重新安排自身。
Worker
现在没有锁定参数 2. Worker.perform_async(2)
将作业 B2 加入队列。我希望这是一个空操作,但它会排入另一个作业,因为我们在步骤 4 中释放了唯一锁。Worker.perform_async(2)
将作业 B3 排队...等等。
解决方案 2:使用 unique_until: :success
和 Sidekiq::Limiter.concurrent
如果我切换到 unique_until: :success
(未指定 unique_until
时的默认行为),我可以解决重复作业问题。
这个解决方案的缺点是它打开了一个竞争条件,工作人员错过了在工作 运行 时发生的更新。
解决方案 3:用专用的 Sidekiq 进程替换限制器
如果我停止使用 Sidekiq::Limiter.concurrent
而是使用由只有一个线程的 Sidekiq 进程处理的队列,我可以解决重复作业问题并避免竞争条件。
这个解决方案的缺点是我不得不使用资源受限的硬件,所以添加第二个 Sidekiq 进程是有影响的。
解决方案 1 的代码示例
这显示了我如何使用唯一性和限制器功能:
class ExpensiveWorker
include Sidekiq::Worker
sidekiq_options unique_for: 30.minutes, unique_until: :start
EXPENSIVE_JOB_LIMITER = Sidekiq::Limiter.concurrent('expensive_job',
1,
wait_timeout: 5.seconds,
lock_timeout: 15.minutes)
def perform(id)
EXPENSIVE_JOB_LIMITER.within_limit do
Rails.logger.info "Processing #{id}..."
sleep 10
end
end
end
关于我试图解决的问题的详细信息
为了简单起见,我将把我们正在处理的数据描述为作者模型,每个模型都有很多书。我们有 RebuildAuthorImagesWorker
和 ClassifyAuthorGenreWorker
,它们都将作者 ID 作为它们唯一的参数。
这两个工作人员都对作者和作者的书籍执行 CPU 和 RAM 密集型计算。我们使用 Sidekiq::Limiter.concurrent
来确保这些工人中只有一个在任何给定时间有一份活跃的工作。我们这样做是为了避免影响我们微不足道的服务器。 (我们还有很多其他工人不需要这样限制。)
由于多个活跃用户或一个用户更新了同一作者的多本书,在短时间内对同一作者或该作者的图书进行多次更新是很常见的。我们使用 unique_for: :start
来防止 RebuildAuthorImagesWorker
被同一作者多次排队。 ClassifyAuthorGenreWorker
同上。由于与 运行 相关的系统开销,我们希望避免重复作业。这些作业是幂等的,因此重复的作业不会导致数据问题。 (每个工人一个工作排队给同一个作者是可以正常的。)
如果 RebuildAuthorImagesWorker
在作者 A 上主动 运行,然后用户 X 在 RebuildAuthorImagesWorker
作业完成之前对作者 A 进行了更新,那么我们 做 想要为作者 A 排入第二个 RebuildAuthorImagesWorker
作业,因此我们不会错过将用户 X 更新的数据合并到图像中。这就是为什么我们使用 unique_until: :start
.
一个想法:
当用户想要更改作者 A 时,我会为作者 A 排入一个预定的、唯一的 UpdateAuthorJob
,它会在 10 分钟后 更新他们的信息。这样,用户可以对作者进行大量更改,系统将在执行实际更新工作之前等待 10 分钟的冷却时间,确保您将所有更新作为一个组获得。