如何防止并行 Sidekiq 作业在 Rails 中执行代码
How to prevent parallel Sidekiq jobs from executing code in Rails
我有大约 10 名工人从事以下工作:
user = User.find_or_initialize_by(email: 'some-email@address.com')
if user.new_record?
# ... some code here that does something taking around 5 seconds or so
elsif user.persisted?
# ... some code here that does something taking around 5 seconds or so
end
user.save
问题是在某些时候,两个或更多的工作人员运行这个代码在准确的时间,因此我后来发现两个或更多的用户有相同的email
,在我应该总是只收到唯一的电子邮件。
我的情况不可能为 email
创建数据库唯一索引,因为唯一的电子邮件是有条件的——有些用户应该有唯一的电子邮件,有些则没有。
值得注意的是,我的 User
模型具有唯一性验证,但它仍然对我没有帮助,因为在 .find_or_initialize_by
和 .save
之间,有一个代码依赖于用户对象是否已创建。
我尝试了悲观和乐观锁定,但对我没有帮助,或者我可能只是没有正确实施它...您是否对此有一些建议。
我唯一能想到的解决方案是在执行这些代码行时锁定其他线程(Sidekiq 作业),但我不太确定如何实现这个,也不知道这是否是一个建议方法。
如有任何帮助,我将不胜感激。
编辑
在我的具体情况下,很难将电子邮件参数放入作业中,因为该作业比上面所说的要复杂一些。该作业实际上是一个导出脚本,其中作业的一部分是上面的代码。我不认为也可以将上面的功能分离到另一个单独的工作人员中......因为整个工作流程应该是串行的,并且不应并行/异步处理任何部分。此作业只是由另一个作业管理的作业之一,而另一个作业最终由 master 作业管理。
悲观锁定是您想要的,但仅适用于存在的记录 - 您不能将其与 new_record?
一起使用,因为数据库中还没有任何内容可锁定。
我设法通过以下方式解决了我的问题:
我发现我实际上可以在 Rails DB Uniqueness Partial Index 中添加一个 where
子句,因此我现在可以在其他并发的数据库级别为不同类型的用户设置唯一性条件如果已创建作业,现在将引发 ActiveRecord::RecordNotUnique
错误。
现在唯一的问题是 .find_or_initialize_by
和 .save
之间的代码,因为这些代码依赖于用户对象,其中总是只有一个并发作业应该始终获得 .new_record? == true
,然后其他并发作业应该触发 .persisted? == true
因为一个作业总是第一个创建它,但是......所有这些都不起作用,因为它只在行 .save
调用数据库唯一性索引验证的地方。因此,我设法通过在这些条件之前放置 .save
来解决这个问题,同时我为 .save
添加了一个救援块,如果它触发 ActiveRecord::RecordNotUnique
错误,以确保异步作业不会发生冲突。代码现在如下所示。
user = User.find_or_initialize_by(email: 'some-email@address.com')
begin
user.save
is_new_record = user.new_record?
is_persisted = user.persisted?
rescue ActiveRecord::RecordNotUnique => exception
MyJob.perform_later(params_hash)
end
if is_new_record
# do something if not yet created
elsif is_persisted
# do something if already created
end
我建议使用不同的架构来绕过这个问题。
生产者-工人模型怎么样,其中一个主 Sidekiq 进程获取电子邮件地址列表,然后为每封电子邮件生成一个工作 Sidekiq 进程? Sidekiq 通过专供 master 和 worker 进行通信的专用队列让这一切变得简单。
这样做,电子邮件地址就成为工作人员的一个输入参数,所以我们知道通过构造工作人员不会互相干扰数据。
我有大约 10 名工人从事以下工作:
user = User.find_or_initialize_by(email: 'some-email@address.com')
if user.new_record?
# ... some code here that does something taking around 5 seconds or so
elsif user.persisted?
# ... some code here that does something taking around 5 seconds or so
end
user.save
问题是在某些时候,两个或更多的工作人员运行这个代码在准确的时间,因此我后来发现两个或更多的用户有相同的email
,在我应该总是只收到唯一的电子邮件。
我的情况不可能为 email
创建数据库唯一索引,因为唯一的电子邮件是有条件的——有些用户应该有唯一的电子邮件,有些则没有。
值得注意的是,我的 User
模型具有唯一性验证,但它仍然对我没有帮助,因为在 .find_or_initialize_by
和 .save
之间,有一个代码依赖于用户对象是否已创建。
我尝试了悲观和乐观锁定,但对我没有帮助,或者我可能只是没有正确实施它...您是否对此有一些建议。
我唯一能想到的解决方案是在执行这些代码行时锁定其他线程(Sidekiq 作业),但我不太确定如何实现这个,也不知道这是否是一个建议方法。
如有任何帮助,我将不胜感激。
编辑
在我的具体情况下,很难将电子邮件参数放入作业中,因为该作业比上面所说的要复杂一些。该作业实际上是一个导出脚本,其中作业的一部分是上面的代码。我不认为也可以将上面的功能分离到另一个单独的工作人员中......因为整个工作流程应该是串行的,并且不应并行/异步处理任何部分。此作业只是由另一个作业管理的作业之一,而另一个作业最终由 master 作业管理。
悲观锁定是您想要的,但仅适用于存在的记录 - 您不能将其与 new_record?
一起使用,因为数据库中还没有任何内容可锁定。
我设法通过以下方式解决了我的问题:
我发现我实际上可以在 Rails DB Uniqueness Partial Index 中添加一个 where
子句,因此我现在可以在其他并发的数据库级别为不同类型的用户设置唯一性条件如果已创建作业,现在将引发 ActiveRecord::RecordNotUnique
错误。
现在唯一的问题是 .find_or_initialize_by
和 .save
之间的代码,因为这些代码依赖于用户对象,其中总是只有一个并发作业应该始终获得 .new_record? == true
,然后其他并发作业应该触发 .persisted? == true
因为一个作业总是第一个创建它,但是......所有这些都不起作用,因为它只在行 .save
调用数据库唯一性索引验证的地方。因此,我设法通过在这些条件之前放置 .save
来解决这个问题,同时我为 .save
添加了一个救援块,如果它触发 ActiveRecord::RecordNotUnique
错误,以确保异步作业不会发生冲突。代码现在如下所示。
user = User.find_or_initialize_by(email: 'some-email@address.com')
begin
user.save
is_new_record = user.new_record?
is_persisted = user.persisted?
rescue ActiveRecord::RecordNotUnique => exception
MyJob.perform_later(params_hash)
end
if is_new_record
# do something if not yet created
elsif is_persisted
# do something if already created
end
我建议使用不同的架构来绕过这个问题。
生产者-工人模型怎么样,其中一个主 Sidekiq 进程获取电子邮件地址列表,然后为每封电子邮件生成一个工作 Sidekiq 进程? Sidekiq 通过专供 master 和 worker 进行通信的专用队列让这一切变得简单。
这样做,电子邮件地址就成为工作人员的一个输入参数,所以我们知道通过构造工作人员不会互相干扰数据。