Worker 劫持 Active Record 模型

Worker hijacking Active Record model

我是 ruby-大三学生。 我的应用程序允许用户输入联系人 and/or 上传 CSV 文件。

我正在使用以下版本:

  ruby "2.3.0"
  gem "rails", "4.2.5.1" gem "pg", "0.17.1" # postgresql database 
  gem "delayed_job_active_record", ">= 4.0.0.beta1" # background job
  processing gem "delayed_job_web", ">= 1.2.0" # web interface for delayed job

同时使用:

> class CsvUploader < CarrierWave::Uploader::Base

def store_dir "uploads/#{model.class.to_s.underscore}/#{mounted_as}/#{model.id}" end

这是工人:

class ImportCsvFileWorker

def self.perform(csv_file_id) csv_file = CsvFile.find(csv_file_id)

csv_file.import!
csv_file.send_report!   end

end

我正在使用smarecsv解析服务

def process_csv parser = ::ImportData::SmartCsvParser.new(csv_file.file_url)

parser.each do |smart_row|
  csv_file.increment!(:total_parsed_records)
  begin
    self.contact = process_row(smart_row)
  rescue => e
    row_parse_error(smart_row, e)
  end
end   rescue => e # parser error or unexpected error
csv_file.save_import_error(e)   end

是否 delayed_job 锁定了 User/Contact 的数据库,所以我无法通过应用程序添加任何联系人?

在本地,应用程序 frozen/hanging 或似乎被锁定,直到后台 delayed_job 完成(顺便说一句,如果我在 Heroku 上 运行,它会导致 H12 错误,但我认为我需要修复先在本地发行)。只是想了解 - 是什么导致它被锁定?它应该这样做吗?它是代码吗(CSV 文件的业务逻辑和添加联系人的视图都是独立工作的)?但是App端如果有后台作业运行ning或者是Active Record的处理方式就不行了。有没有解决的办法?

我没有隔离它,但我怀疑如果任何后台作业正在 运行ning,应用程序将变得不可用。

我已尝试包含所有相关事实 - 如果需要任何进一步的详细信息,请告诉我。非常感谢您的帮助。

更新 - 我发现我有一个似乎锁定了所有联系人的 ContactMergingService。如果我在下面注释掉此服务,我的应用程序不会挂起。

所以我的问题是其他选项是什么 = 在添加联系人之前,我想做的是找到所有现有的相同电子邮件地址(如果找到它,我会附加联系方式)。我如何在不锁定数据库的情况下执行此操作?

是因为我用的是'find'方法吗?有没有更好的方法?

> class ContactMergingService
> 
>   attr_reader :new_contact, :user
> 
>   def initialize(user, new_contact, _existing_records)
>     @user = user
>     @new_contact = new_contact
>     @found_records = matching_emails_and_phone_numbers   
>   end
> 
>   def perform
>     Rails.logger.info "[CSV.merging] Checking if new contact matches existing contact..."
>     if (existing_contact = existing_contact())
>       Rails.logger.info "[CSV.merging] Contact match found."
>       merge(existing_contact, new_contact)
>       existing_contact
>     else
>       Rails.logger.info "[CSV.merging] No contact match found."
>       new_contact
>     end   end
> 
>   private
> 
>   def existing_contact
>     Rails.logger.info "[CSV.merging] Found records: #{@found_records.inspect}"
>     if @found_records.present?
>       @user.contacts.find @found_records.first.owner_id # Fetch first owner
>     end   end
> 
>   def merge(existing_contact, new_contact)
>     Rails.logger.info "[CSV.merging] Merging with existing contact (ID: #{existing_contact.id})..."
>     merge_records(existing_contact, new_contact)   end
> 
>   def merge_records(existing_relation, new_relation)
>     existing_relation.attributes do |field, value|
>       if value.blank? && new_relation[field].present?
>         existing_relation[field] = new_relation[field]
>       end
>     end
>     new_relation.email_addresses.each do |email_address|
>       Rails.logger.info "[CSV.merging.emails] Email: #{email_address.inspect}"
>       if existing_relation.email_addresses.find_by(email: email_address.email)
>         Rails.logger.info "[CSV.merging.emails] Email address exists."
>       else
>         Rails.logger.info "[CSV.merging.emails] Email does not already exist. Saving..."
>         email_address.owner = existing_relation
>         email_address.save!
>       end
>     end
>     new_relation.phone_numbers.each do |phone_number|
>       Rails.logger.info "[CSV.merging.phone] Phone Number: #{phone_number.inspect}"
>       if existing_relation.phone_numbers.find_by(number: phone_number.number)
>         Rails.logger.info "[CSV.merging.phone] Phone number exists."
>       else
>         Rails.logger.info "[CSV.merging.phone] Phone Number does not already exist. Saving..."
>         phone_number.owner = existing_relation
>         phone_number.save!
>       end
>     end   end
> 
>   def matching_emails_and_phone_numbers
>     records = []
>     if @user
>       records << matching_emails
>       records << matching_phone_numbers
>       Rails.logger.info "[CSV.merging] merged records: #{records.inspect}"
>       records.flatten!
>       Rails.logger.info "[CSV.merging] flattened records: #{records.inspect}"
>       records.compact!
>       Rails.logger.info "[CSV.merging] compacted records: #{records.inspect}"
>     end
>     records   end
> 
>   def matching_emails
>     existing_emails = []
>     new_contact_emails = @new_contact.email_addresses
>     Rails.logger.info "[CSV.merging] new_contact_emails: #{new_contact_emails.inspect}"
>     new_contact_emails.each do |email|
>       Rails.logger.info "[CSV.merging] Checking for a match on email: #{email.inspect}..."
>       if existing_email = @user.contact_email_addresses.find_by(email: email.email, primary: email.primary)
>         Rails.logger.info "[CSV.merging] Found a matching email"
>         existing_emails << existing_email
>       else
>         Rails.logger.info "[CSV.merging] No match found"
>         false
>       end
>     end
>     existing_emails   end
> 
>   def matching_phone_numbers
>     existing_phone_numbers = []
>     @new_contact.phone_numbers.each do |phone_number|
>       Rails.logger.info "[CSV.merging] Checking for a match on phone_number: #{phone_number.inspect}..."
>       if existing_phone_number = @user.contact_phone_numbers.find_by(number: phone_number.number)
>         Rails.logger.info "[CSV.merging] Found a matching phone number"
>         existing_phone_numbers << existing_phone_number
>       else
>         Rails.logger.info "[CSV.merging] No match found"
>         false
>       end
>     end
>     existing_phone_numbers   end
> 
>   def clean_phone_number(number)
>     number.gsub(/[\s\-\(\)]+/, "")   end
> 
> end

您可以尝试类似的方法:

 Thread.new do
    ActiveRecord::Base.transaction do   
      User.import(user_data)
    end
    ActiveRecord::Base.connection.close
 end

在您的 CVS 导入代码中。

我们得出结论,问题的原因是 CsvParsingService#perform 运行s 将 AccessShareLocks 放在数据库中的某些表上(我们认为是 Contacts、EmailAddresses、PhoneNumbers,也许还有 Users)。

锁一直存在,直到该方法完成。任何其他试图访问这些锁定表之一的请求都将等待数据库解锁。因为该方法解析给定上传的每一行 csv_file,它需要长达 90 分钟才能 运行.

任何尝试访问这些锁定表格之一的应用程序请求都将停止并等待表格解锁。因为 Heroku 将在 30 秒后切断请求,这就是产生 H12 错误的原因(在应用程序端)。

问题的原因是 gem state-machine_active 记录默认将每个状态转换包装在事务中。

工作人员正在通过 运行ning csv_file.import! 调用解析服务,这会触发 csv_file 状态机上的转换,然后调用 CsvParsingService 并解析每一行。由于状态机将所有内容包装在一个事务中,因此在状态转换完成之前不会提交任何内容。

通过将gem更新到0.4.0pre版本,并在CsvFile模型中的状态机中添加选项use_transactions: false,调用.import时不再锁定数据库!和处理。