Sidekiq:找到最后一份工作

Sidekiq: Find last job

我有两个 Sidekiq 作业。第一个加载 JSON 中的文章提要并将其拆分为多个作业。它还会创建日志并存储 start_time.

class LoadFeed
  include Sidekiq::Worker

  def perform url
    log = Log.create! start_time: Time.now, url: url
    articles = load_feed(url) # this one loads the feed
    articles.each do |article|
      ProcessArticle.perform_async(article, log.id)
    end
  end
end

第二个作业处理一篇文章并更新之前创建的日志的 end_time 字段 以了解整个过程(加载提要、将其拆分为作业、处理文章)拿走了。

class ProcessArticle
  include Sidekiq::Worker

  def perform data, log_id
    process(data)
    Log.find(log_id).update_attribute(:end_time, Time.now)
  end
end

但是现在我遇到了一些问题/疑问:

  1. Log.find(log_id).update_attribute(:end_time, Time.now) isn't atomic, and because of the async behaviour of the jobs, this could lead to incorrect end_time values. Is there a way to do an atomic update of a datetime field in MySQL with the current time?
  2. The feed can get pretty long (~ 800k articles) and updating a value 800k times when you would just need the last one seems like a lot of unnecessary work. Any ideas how to find out which one was the last job, and only update the end_time field in this job?

对于 1),您可以使用更少的查询进行更新,让 MySQL 找到时间:

Log.where(id: log_id).update_all('end_time = now()')

对于 2) 解决此问题的一种方法是仅在处理完所有文章后才更新结束时间。例如,通过拥有一个可以查询的布尔值。这不会减少查询的数量,但肯定会有更好的性能。

if feed.articles.needs_processing.none?
  Log.where(id: log_id).update_all('end_time = now()')
end

这是 Sidekiq Pro 的 Batch 功能解决的问题。您创建一组作业,当它们全部完成时它会调用您的代码。

class LoadFeed
  include Sidekiq::Worker

  def on_success(status, options)
    Log.find(options['log_id']).update_attribute(:end_time, Time.now)
  end

  def perform url
    log = Log.create! start_time: Time.now, url: url
    articles = load_feed(url) # this one loads the feed
    batch = Sidekiq::Batch.new
    batch.on(:success, self.class, 'log_id' => log.id)
    batch.jobs do
      articles.each do |article|
        ProcessArticle.perform_async(article, log.id)
      end
    end
  end
end