工人内部的 sidekiq perform_in(delay) 忽略延迟
sidekiq perform_in(delay) from within the worker ignores the delay
我的应用程序中的用户创建了 Transactions
,我需要这些交易(以及当用户在特定时间内没有响应时将交易状态更改为 ignored
所创建的相关作业) 自行取消,除非用户执行 pay
操作。
我在一个示例中使用的方法是在状态更改为 approved
后使用 perform_async
进行以下调用,如果未及时响应则取消:
Class Transaction < ApplicationRecord
#when approved
def create_worker
MyWorker.perform_async(self.id)
end
#if user responds in time, cancel the jobs and update the record to `paid` etc
def cancel_worker
jid = MyWorker.perform_async(self.id)
MyWorker.cancel! jid
end
end
按照建议 here and here,我在 worker 中添加了关于何时取消的附加功能。它看起来像这样:
class MyWorker
include Sidekiq::Worker
def perform(transaction_id)
return if paid?
transaction = Transaction.find transaction_id
self.class.perform_in(1.minutes, transaction.ignore!)
end
def paid?
Sidekiq.redis { |c| c.exists("paid-#{jid}") }
end
def self.cancel! jid
Sidekiq.redis { |c| c.setex("paid-#{jid}", 86400, 1) }
end
end
此代码导致以下终端输出:
2018-12-16T01:40:50.645Z 30530 TID-oxm547nes MyWorker JID-6c97e448fe30998235dee95d INFO: start
Changing transaction 4 approved to ignored (event: ignore!)
2018-12-16T01:40:50.884Z 30530 TID-oxm547nes MyWorker JID-6c97e448fe30998235dee95d INFO: done: 0.239 sec
2018-12-16T01:41:56.122Z 30530 TID-oxm547oag MyWorker JID-b46bb3b002e00f480a04be16 INFO: start
2018-12-16T01:41:56.125Z 30530 TID-oxm547oag MyWorker JID-b46bb3b002e00f480a04be16 INFO: fail: 0.003 sec
2018-12-16T01:41:56.126Z 30530 TID-oxm547oag WARN: {"context":"Job raised exception","job":{"class":"MyWorker","args":[true],"retry":true,"queue":"default","jid":"b46bb3b002e00f480a04be16","created_at":1544924450.884224,"enqueued_at":1544924516.107598,"error_message":"Couldn't find Transaction with 'id'=true","error_class":"ActiveRecord::RecordNotFound","failed_at":1544924516.125679,"retry_count":0},"jobstr":"{\"class\":\"MyWorker\",\"args\":[true],\"retry\":true,\"queue\":\"default\",\"jid\":\"b46bb3b002e00f480a04be16\",\"created_at\":1544924450.884224,\"enqueued_at\":1544924516.107598}"}
因此这会创建两个作业 - 一个具有 6c97e448fe30998235dee95d
的 jid 并立即将事务设置为 ignored
,然后一个具有 b46bb3b002e00f480a04be16
的 jid 直接过去工人的 perform
函数中的早期 return (因为它不使用与第一份工作相同的 jid)。
我可以推测为什么这不能按我预期的方式工作的一个原因是对 MyWorker.cancel!
的调用无法获得我 想要 的工作人员的 jid在没有先创建数据库迁移来保存所述 jid 的情况下取消。
创建数据库迁移以包含工作人员的 jid 是否是确保 jid 在操作之间可访问的首选方法? id=true
是怎么进去的?正如上面的错误所说:Couldn't find Transaction with 'id'=true"
好的,我们一块块来。
此代码:
self.class.perform_in(1.minute, transaction.ignore!)
正在传递 ignore!
方法(在本例中为 true
)的任何返回值作为作业的参数,这会导致异常。
您应该确保传递正确的参数:
self.class.perform_in(1.minute, transaction.tap(&:ignore!).id)
每次您调用 MyWorker.perform_async
(或任何其他执行 class 的方法)时,您都在创建一个新工作,因此您没有得到相同的结果也就不足为奇了 jid
。
您应该按照建议将首字母 jid
存储在交易 table 中,然后在付款时取回它以取消。否则作业 ID 将丢失。另一种方法是实际使用相同的 redis 来存储付费标志,但改为由事务键入。 c.exists("paid-#{transaction.id}")
您的代码不会等待 1 分钟来忽略事务,它只是立即忽略事务并设置自己在 1 分钟内再次执行。
您可能想打电话给
jid = MyWorker.perform_in(1.minute, transaction.id)
直接来自 create_worker
方法。
更新
如果像我想象的那样,您正在使用某种持久状态机,那么只 "ignore unless complete" 而忘记取消作业就更容易了
class Transaction
# I'm inventing a DSL here
include SomeStateMachine
state :accepted do
event :ignore, to: :ignored
event :confirm, to: :confirmed
end
state :ignored
state :confirmed
def create_worker
# no need to track it
MyWorker.perform_in(1.minute, id)
end
end
class MyWorker
include Sidekiq::Worker
def perform(id)
transaction = Transaction.find(id)
transaction.ignore! if transaction.can_ignore?
end
end
你可以让你的工作 运行,它会愉快地跳过任何不可忽略的事务。
我的应用程序中的用户创建了 Transactions
,我需要这些交易(以及当用户在特定时间内没有响应时将交易状态更改为 ignored
所创建的相关作业) 自行取消,除非用户执行 pay
操作。
我在一个示例中使用的方法是在状态更改为 approved
后使用 perform_async
进行以下调用,如果未及时响应则取消:
Class Transaction < ApplicationRecord
#when approved
def create_worker
MyWorker.perform_async(self.id)
end
#if user responds in time, cancel the jobs and update the record to `paid` etc
def cancel_worker
jid = MyWorker.perform_async(self.id)
MyWorker.cancel! jid
end
end
按照建议 here and here,我在 worker 中添加了关于何时取消的附加功能。它看起来像这样:
class MyWorker
include Sidekiq::Worker
def perform(transaction_id)
return if paid?
transaction = Transaction.find transaction_id
self.class.perform_in(1.minutes, transaction.ignore!)
end
def paid?
Sidekiq.redis { |c| c.exists("paid-#{jid}") }
end
def self.cancel! jid
Sidekiq.redis { |c| c.setex("paid-#{jid}", 86400, 1) }
end
end
此代码导致以下终端输出:
2018-12-16T01:40:50.645Z 30530 TID-oxm547nes MyWorker JID-6c97e448fe30998235dee95d INFO: start
Changing transaction 4 approved to ignored (event: ignore!)
2018-12-16T01:40:50.884Z 30530 TID-oxm547nes MyWorker JID-6c97e448fe30998235dee95d INFO: done: 0.239 sec
2018-12-16T01:41:56.122Z 30530 TID-oxm547oag MyWorker JID-b46bb3b002e00f480a04be16 INFO: start
2018-12-16T01:41:56.125Z 30530 TID-oxm547oag MyWorker JID-b46bb3b002e00f480a04be16 INFO: fail: 0.003 sec
2018-12-16T01:41:56.126Z 30530 TID-oxm547oag WARN: {"context":"Job raised exception","job":{"class":"MyWorker","args":[true],"retry":true,"queue":"default","jid":"b46bb3b002e00f480a04be16","created_at":1544924450.884224,"enqueued_at":1544924516.107598,"error_message":"Couldn't find Transaction with 'id'=true","error_class":"ActiveRecord::RecordNotFound","failed_at":1544924516.125679,"retry_count":0},"jobstr":"{\"class\":\"MyWorker\",\"args\":[true],\"retry\":true,\"queue\":\"default\",\"jid\":\"b46bb3b002e00f480a04be16\",\"created_at\":1544924450.884224,\"enqueued_at\":1544924516.107598}"}
因此这会创建两个作业 - 一个具有 6c97e448fe30998235dee95d
的 jid 并立即将事务设置为 ignored
,然后一个具有 b46bb3b002e00f480a04be16
的 jid 直接过去工人的 perform
函数中的早期 return (因为它不使用与第一份工作相同的 jid)。
我可以推测为什么这不能按我预期的方式工作的一个原因是对 MyWorker.cancel!
的调用无法获得我 想要 的工作人员的 jid在没有先创建数据库迁移来保存所述 jid 的情况下取消。
创建数据库迁移以包含工作人员的 jid 是否是确保 jid 在操作之间可访问的首选方法? id=true
是怎么进去的?正如上面的错误所说:Couldn't find Transaction with 'id'=true"
好的,我们一块块来。
此代码:
self.class.perform_in(1.minute, transaction.ignore!)
正在传递
ignore!
方法(在本例中为true
)的任何返回值作为作业的参数,这会导致异常。您应该确保传递正确的参数:
self.class.perform_in(1.minute, transaction.tap(&:ignore!).id)
每次您调用
MyWorker.perform_async
(或任何其他执行 class 的方法)时,您都在创建一个新工作,因此您没有得到相同的结果也就不足为奇了jid
。您应该按照建议将首字母
jid
存储在交易 table 中,然后在付款时取回它以取消。否则作业 ID 将丢失。另一种方法是实际使用相同的 redis 来存储付费标志,但改为由事务键入。c.exists("paid-#{transaction.id}")
您的代码不会等待 1 分钟来忽略事务,它只是立即忽略事务并设置自己在 1 分钟内再次执行。
您可能想打电话给
jid = MyWorker.perform_in(1.minute, transaction.id)
直接来自
create_worker
方法。
更新
如果像我想象的那样,您正在使用某种持久状态机,那么只 "ignore unless complete" 而忘记取消作业就更容易了
class Transaction
# I'm inventing a DSL here
include SomeStateMachine
state :accepted do
event :ignore, to: :ignored
event :confirm, to: :confirmed
end
state :ignored
state :confirmed
def create_worker
# no need to track it
MyWorker.perform_in(1.minute, id)
end
end
class MyWorker
include Sidekiq::Worker
def perform(id)
transaction = Transaction.find(id)
transaction.ignore! if transaction.can_ignore?
end
end
你可以让你的工作 运行,它会愉快地跳过任何不可忽略的事务。