工人内部的 sidekiq perform_in(delay) 忽略延迟

sidekiq perform_in(delay) from within the worker ignores the delay

我的应用程序中的用户创建了 Transactions,我需要这些交易(以及当用户在特定时间内没有响应时将交易状态更改为 ignored 所创建的相关作业) 自行取消,除非用户执行 pay 操作。

我在一个示例中使用的方法是在状态更改为 approved 后使用 perform_async 进行以下调用,如果未及时响应则取消:

Class Transaction < ApplicationRecord
 #when approved
 def create_worker
  MyWorker.perform_async(self.id)
 end

 #if user responds in time, cancel the jobs and update the record to `paid` etc
 def cancel_worker
  jid = MyWorker.perform_async(self.id)
  MyWorker.cancel! jid
 end
end

按照建议 here and here,我在 worker 中添加了关于何时取消的附加功能。它看起来像这样:

class MyWorker
 include Sidekiq::Worker

 def perform(transaction_id)
  return if paid?
  transaction = Transaction.find transaction_id
  self.class.perform_in(1.minutes, transaction.ignore!)
 end

 def paid?
  Sidekiq.redis { |c| c.exists("paid-#{jid}") }
 end

 def self.cancel! jid
  Sidekiq.redis { |c| c.setex("paid-#{jid}", 86400, 1) }
 end
end

此代码导致以下终端输出:

2018-12-16T01:40:50.645Z 30530 TID-oxm547nes MyWorker JID-6c97e448fe30998235dee95d INFO: start
Changing transaction 4 approved to ignored (event: ignore!)
2018-12-16T01:40:50.884Z 30530 TID-oxm547nes MyWorker JID-6c97e448fe30998235dee95d INFO: done: 0.239 sec
2018-12-16T01:41:56.122Z 30530 TID-oxm547oag MyWorker JID-b46bb3b002e00f480a04be16 INFO: start
2018-12-16T01:41:56.125Z 30530 TID-oxm547oag MyWorker JID-b46bb3b002e00f480a04be16 INFO: fail: 0.003 sec
2018-12-16T01:41:56.126Z 30530 TID-oxm547oag WARN: {"context":"Job raised exception","job":{"class":"MyWorker","args":[true],"retry":true,"queue":"default","jid":"b46bb3b002e00f480a04be16","created_at":1544924450.884224,"enqueued_at":1544924516.107598,"error_message":"Couldn't find Transaction with 'id'=true","error_class":"ActiveRecord::RecordNotFound","failed_at":1544924516.125679,"retry_count":0},"jobstr":"{\"class\":\"MyWorker\",\"args\":[true],\"retry\":true,\"queue\":\"default\",\"jid\":\"b46bb3b002e00f480a04be16\",\"created_at\":1544924450.884224,\"enqueued_at\":1544924516.107598}"}

因此这会创建两个作业 - 一个具有 6c97e448fe30998235dee95d 的 jid 并立即将事务设置为 ignored,然后一个具有 b46bb3b002e00f480a04be16 的 jid 直接过去工人的 perform 函数中的早期 return (因为它不使用与第一份工作相同的 jid)。

我可以推测为什么这不能按我预期的方式工作的一个原因是对 MyWorker.cancel! 的调用无法获得我 想要 的工作人员的 jid在没有先创建数据库迁移来保存所述 jid 的情况下取消。

创建数据库迁移以包含工作人员的 jid 是否是确保 jid 在操作之间可访问的首选方法? id=true 是怎么进去的?正如上面的错误所说:Couldn't find Transaction with 'id'=true"

好的,我们一块块来。

  1. 此代码:

    self.class.perform_in(1.minute, transaction.ignore!)
    

    正在传递 ignore! 方法(在本例中为 true)的任何返回值作为作业的参数,这会导致异常。

    您应该确保传递正确的参数:

    self.class.perform_in(1.minute, transaction.tap(&:ignore!).id)
    
  2. 每次您调用 MyWorker.perform_async(或任何其他执行 class 的方法)时,您都在创建一个新工作,因此您没有得到相同的结果也就不足为奇了 jid

    您应该按照建议将首字母 jid 存储在交易 table 中,然后在付款时取回它以取消。否则作业 ID 将丢失。另一种方法是实际使用相同的 redis 来存储付费标志,但改为由事务键入。 c.exists("paid-#{transaction.id}")

  3. 您的代码不会等待 1 分钟来忽略事务,它只是立即忽略事务并设置自己在 1 分钟内再次执行。

    您可能想打电话给

    jid = MyWorker.perform_in(1.minute, transaction.id)
    

    直接来自 create_worker 方法。


更新

如果像我想象的那样,您正在使用某种持久状态机,那么只 "ignore unless complete" 而忘记取消作业就更容易了

class Transaction
  # I'm inventing a DSL here
  include SomeStateMachine

  state :accepted do
    event :ignore, to: :ignored
    event :confirm, to: :confirmed
  end
  state :ignored
  state :confirmed

  def create_worker
    # no need to track it
    MyWorker.perform_in(1.minute, id)
  end
end

class MyWorker
  include Sidekiq::Worker

  def perform(id)
    transaction = Transaction.find(id)
    transaction.ignore! if transaction.can_ignore?
  end
end

你可以让你的工作 运行,它会愉快地跳过任何不可忽略的事务。