在生产环境中,Sidekiq 将作业添加到历史记录中,但不执行它(Sidekiq + Redis + EC2 + Cloud66)

In production Sidekiq add job to history, but doesn't execute it (Sidekiq + Redis + EC2 + Cloud66)

我的应用程序有一个导入功能,可以执行 Sidekiq Worker 并导入一堆 CSV 行,将它们保存到我的数据库中。当我在我的本地机器上执行 Sidekiq 时,这工作正常,但是当我将代码部署到生产环境时,Sidekiq 只会正确执行一次作业。当我第二次使用 import 函数时,作业直接进入 Sidekiq 中的历史记录堆,worker 中的逻辑永远不会执行。这真的是 st运行ge,因为它没有抛出任何错误,就好像作业被正确执行了一样。对于暂存,我在 AWS Elastic Cache 中使用 Redis。

redis_version: 5.0.6

rails, "5.0.7"
sidekiq, "6.0.5"
sidekiq-failures, "1.0.0"
sidekiq-history, "0.0.11"
sidekiq-limit_fetch, "3.4.0"
sidekiq-pro, "5.0.1"
sidekiq-unique-jobs, "6.0.15"

我将不胜感激与您之前遇到的类似问题相关的任何提示,或者我可以做的任何其他事情来调试此问题。我已经 运行

Sidekiq.redis { |conn| conn.ping }
=> "PONG"

看起来 Redis 连接正常。

项目工作者

# frozen_string_literal: true

class ImportWorker
  include Sidekiq::Worker
  sidekiq_options queue: "import_worker", lock: :until_executed, retry: false

  def perform(import_id)
    import = Import.find_by(id: import_id)
    return if import.blank?

    path = import.file.expiring_url(10)
    file = open(path)

    csv = CSV.parse(file.read, headers: true)
    import.update!( number_of_lines_in_csv: csv.size,
                    import_started_at: DateTime.now)

    created_transactions = []
    csv.each do |row|
      guid = row["TransactionUniqueId"]
      next if guid.blank?

      existing_transaction = Transaction.find_by(transaction_unique_id: guid)
      next if existing_transaction.present?

      attributes = Transaction.convert_attributes(import, row).merge(imported_at: Time.now)

      transaction = Transaction.create!(attributes)
      created_transactions << [transaction.id, guid]
      Rails.logger.info "Transaction #{row["TransactionUniqueId"]} created."
    end

    import.update!(import_finished_at: DateTime.now,
                   imported:           true)
    send_mail(import_id, created_transactions)
  end

  def send_mail(import_id, created_transactions)
    ["email0@example.com", "email1@example.com"].each do |email|
      ImportTransactionsMailer.import_processed(import_id, email, created_transactions).deliver
    end
  end
end

编辑 1:抱歉,我忘了说我正在使用 Cloud66 部署我的应用程序,如果这有任何帮助的话。

sidekiq_options queue: "import_worker", lock: :until_executed, retry: false

如果出现错误怎么办?作业会被丢弃但唯一性锁会保留,以防止更多作业排队吗?

我找到问题所在了。因此,我在我的导入模型中的 after_create 挂钩中触发了我的 ImportWorker,如下所示。

# frozen_string_literal: true

class Import < ApplicationRecord
  has_many :transactions
  belongs_to :admin_user

  has_attached_file :file, s3_protocol: :https
  validates_attachment_content_type :file, content_type: ["text/plain",
                                                          "text/csv",
                                                          "application/vnd.ms-excel",
                                                          "application/octet-stream"]
  validates :file, attachment_presence: true
  has_paper_trail
  after_create :run_import_in_background

  def run_import_in_background
    ImportWorker.perform_async(id)
  end
end

但是当Worker执行第一行找到Import时

class ImportWorker
  include Sidekiq::Worker
  sidekiq_options queue: "import_worker", lock: :until_executed, retry: false

  def perform(import_id)
    import = Import.find_by(id: import_id)
    return if import.blank?
...

如果导入是 nil,它应该 return。问题是,我假设导入永远不会是 nil,因为这是从 after_create 挂钩触发的,但它实际上是 nil。当我将 return 行更改为 raise StandardError.new("Empty import object.") if import.blank? 时,工作人员开始失败。

所以我也将我的工作人员 sidekiq_optionsretry: false 更改为 retry: 3 并且在第二次尝试中工作人员执行正常,因为它现在可以找到具有指定 ID 的导入。所以我认为这是 after_create 挂钩和 Sidekiq 之间的某种同步问题。这也可能与在此设置中使用 S3 gem 有关。在 S3 中保存文件可能会导致在数据库中保存对象时出现一些延迟。

您可以在下面看到最终的 Worker 代码。

# frozen_string_literal: true

class ImportWorker
  include Sidekiq::Worker
  sidekiq_options queue: "import_worker", lock: :until_executed, retry: 3

  def perform(import_id)
    import = Import.find_by(id: import_id)
    raise StandardError.new("Empty import object.") if import.blank?
...