批量创建记录时的 Sidekiq 作业幂等性

Sidekiq job idempotency when bulk-creating records

我们如何以幂等方式批量创建记录?

在下面的示例中,如果一切 运行 都符合预期,则应创建 100,500 张票。但是,假设由于某种未知原因,至少有一个作业 运行 两次。

  1. 我们如何保证作业只创建请求的确切数量的工单,而不是更多?
  2. 我们可以在没有竞争条件风险的情况下做到这一点吗?

上下文

我正在尝试快速批量创建 100k+ 记录,Sidekiq 最佳实践建议作业应该是幂等的,即它们应该能够 运行 多次并且最终结果应该是相同的。

就我而言,我正在执行以下操作:

例子

我们有一个 raffles table:

id number_of_tickets_requested

创建新的 raffle 记录后,我们希望在 tickets table:

中批量创建抽奖券
id code raffle_id

假设我们刚刚用 number_of_tickets_requested: 100500 创建了一个新的抽奖活动。

(免责声明:我对示例中的内容进行了硬编码以使其更易于理解。)

我目前的尝试

在抽奖模式中:

  MAX_TICKETS_PER_JOB = 1000

  after_create :queue_jobs_to_batch_create_tickets

  def queue_jobs_to_batch_create_tickets
    100.times { BatchCreateTicketsJob.perform_later(raffle, 1000) }
    BatchCreateTicketsJob.perform_later(raffle, 500)
  end

在 BatchCreateTicketsJob 中:

  def perform(raffle, number_of_tickets_to_create)
    BatchCreateTicketsService.call(raffle, number_of_tickets_to_create)
  end

在 BatchCreateTicketsService 中:

  def call
    Raffle.transaction do
      # Uses insert_all to create all tickets in 1 db query
      # It skips Rails validations so is very fast
      # It only creates records that pass the db validations
      result = Ticket.insert_all(tickets)

      unless result.count == number_of_tickets_to_create
        raise ActiveRecord::Rollback
      end
    end
  end

  private

  def tickets
    result = []
    number_of_tickets_to_create.times { result << new_ticket }
    result
  end

  def new_ticket
    {
      code: "#{SecureRandom.hex(6)}".upcase,
      raffle_id: raffle.id
    }
  end

作为参考,我最终选择了:

  • with_lock 以防止竞争条件;
  • 保证原子性的事务;
  • 在抽奖活动中新增 tickets_count 计数器列 table 以确保幂等性。
class BatchCreateTicketsService < ApplicationService
  attr_reader :raffle, :num_tickets

  def initialize(raffle, num_tickets)
    @raffle = raffle
    @num_tickets = num_tickets
  end

  def call
    raffle.with_lock do
      Raffle.transaction do
        create_tickets
      end
    end
  end

  private

  def create_tickets
    result = Ticket.insert_all(tickets)

    raise StandardError unless result.count == num_tickets

    raffle.tickets_count += result.count
    raffle.save
  end

  def tickets
    result = []
    num_tickets.times { result << new_ticket }
    result
  end

  def new_ticket
    {
      code: "#{SecureRandom.hex(6)}".upcase,
      raffle_id: raffle.id
    }
  end
end