Sidekiq:找到最后一份工作
Sidekiq: Find last job
我有两个 Sidekiq 作业。第一个加载 JSON 中的文章提要并将其拆分为多个作业。它还会创建日志并存储 start_time
.
class LoadFeed
include Sidekiq::Worker
def perform url
log = Log.create! start_time: Time.now, url: url
articles = load_feed(url) # this one loads the feed
articles.each do |article|
ProcessArticle.perform_async(article, log.id)
end
end
end
第二个作业处理一篇文章并更新之前创建的日志的 end_time
字段 以了解整个过程(加载提要、将其拆分为作业、处理文章)拿走了。
class ProcessArticle
include Sidekiq::Worker
def perform data, log_id
process(data)
Log.find(log_id).update_attribute(:end_time, Time.now)
end
end
但是现在我遇到了一些问题/疑问:
Log.find(log_id).update_attribute(:end_time, Time.now)
isn't atomic, and because of the async behaviour of the jobs, this could lead to incorrect end_time
values. Is there a way to do an atomic update of a datetime
field in MySQL with the current time?
- The feed can get pretty long (~ 800k articles) and updating a value 800k times when you would just need the last one seems like a lot of unnecessary work. Any ideas how to find out which one was the last job, and only update the
end_time
field in this job?
对于 1),您可以使用更少的查询进行更新,让 MySQL 找到时间:
Log.where(id: log_id).update_all('end_time = now()')
对于 2) 解决此问题的一种方法是仅在处理完所有文章后才更新结束时间。例如,通过拥有一个可以查询的布尔值。这不会减少查询的数量,但肯定会有更好的性能。
if feed.articles.needs_processing.none?
Log.where(id: log_id).update_all('end_time = now()')
end
这是 Sidekiq Pro 的 Batch 功能解决的问题。您创建一组作业,当它们全部完成时它会调用您的代码。
class LoadFeed
include Sidekiq::Worker
def on_success(status, options)
Log.find(options['log_id']).update_attribute(:end_time, Time.now)
end
def perform url
log = Log.create! start_time: Time.now, url: url
articles = load_feed(url) # this one loads the feed
batch = Sidekiq::Batch.new
batch.on(:success, self.class, 'log_id' => log.id)
batch.jobs do
articles.each do |article|
ProcessArticle.perform_async(article, log.id)
end
end
end
end
我有两个 Sidekiq 作业。第一个加载 JSON 中的文章提要并将其拆分为多个作业。它还会创建日志并存储 start_time
.
class LoadFeed
include Sidekiq::Worker
def perform url
log = Log.create! start_time: Time.now, url: url
articles = load_feed(url) # this one loads the feed
articles.each do |article|
ProcessArticle.perform_async(article, log.id)
end
end
end
第二个作业处理一篇文章并更新之前创建的日志的 end_time
字段 以了解整个过程(加载提要、将其拆分为作业、处理文章)拿走了。
class ProcessArticle
include Sidekiq::Worker
def perform data, log_id
process(data)
Log.find(log_id).update_attribute(:end_time, Time.now)
end
end
但是现在我遇到了一些问题/疑问:
Log.find(log_id).update_attribute(:end_time, Time.now)
isn't atomic, and because of the async behaviour of the jobs, this could lead to incorrectend_time
values. Is there a way to do an atomic update of adatetime
field in MySQL with the current time?- The feed can get pretty long (~ 800k articles) and updating a value 800k times when you would just need the last one seems like a lot of unnecessary work. Any ideas how to find out which one was the last job, and only update the
end_time
field in this job?
对于 1),您可以使用更少的查询进行更新,让 MySQL 找到时间:
Log.where(id: log_id).update_all('end_time = now()')
对于 2) 解决此问题的一种方法是仅在处理完所有文章后才更新结束时间。例如,通过拥有一个可以查询的布尔值。这不会减少查询的数量,但肯定会有更好的性能。
if feed.articles.needs_processing.none?
Log.where(id: log_id).update_all('end_time = now()')
end
这是 Sidekiq Pro 的 Batch 功能解决的问题。您创建一组作业,当它们全部完成时它会调用您的代码。
class LoadFeed
include Sidekiq::Worker
def on_success(status, options)
Log.find(options['log_id']).update_attribute(:end_time, Time.now)
end
def perform url
log = Log.create! start_time: Time.now, url: url
articles = load_feed(url) # this one loads the feed
batch = Sidekiq::Batch.new
batch.on(:success, self.class, 'log_id' => log.id)
batch.jobs do
articles.each do |article|
ProcessArticle.perform_async(article, log.id)
end
end
end
end