如何为 ActiveRecord 模型对象找到关联的 Resque 作业?
How to find associated Resque jobs for an ActiveRecord model object?
我需要能够为模型对象找到排队的 and/or 个工作作业 and/or 个失败的作业,例如当模型对象被销毁时我们想要找到所有并且要么决定不删除或销毁作业(有条件地)。
在我重新发明轮子之前,有推荐的方法吗?
示例:
如果您想创建一个 before_destroy
回调,在销毁对象(排队和失败的作业)时销毁所有作业,并且仅在没有工作作业时才销毁
我想为这个示例用例做的一些伪代码:
报表模型
class Report < ActiveRecord::Base
before_destroy :check_if_working_jobs, :destroy_queued_and_failed_jobs
def check_if_working_jobs
# find all working jobs related to this report object
working_jobs = ProcessReportWorker.find_working_jobs_by_report_id(self.id)
return false unless working_jobs.empty?
end
def destroy_queued_and_failed_jobs
# find all jobs related to this report object
queued_jobs = ProcessReportWorker.find_queued_jobs_by_report_id(self.id)
failed_jobs = ProcessReportWorker.find_failed_jobs_by_report_id(self.id)
# destroy/remove all jobs found
(queued_jobs + failed_jobs).each do |job|
# destroy the job here ... commands?
end
end
end
resque/redis 支持的作业的报告处理工作人员class
class ProcessReportWorker
# find the jobs by report id which is one of the arguments for the job?
# envisioned as separate methods so they can be used independently as needed
def self.find_queued_jobs_by_report_id(id)
# parse all jobs in all queues to find based on the report id argument?
end
def self.find_working_jobs_by_report_id(id)
# parse all jobs in working queues to find based on the report id argument?
end
def self.find_failed_jobs_by_report_id(id)
# parse all jobs in failed queue to find based on the report id argument?
end
end
这种方法是否符合需要发生的事情?
通过模型对象 id 找到排队或工作的作业然后销毁上面缺少的部分是什么?
是否已经有方法可以找到 and/or destroy by 我在文档或搜索中遗漏的关联模型对象 ID?
更新: 修改了用法示例,仅使用 working_jobs 作为检查我们是否应该删除的方式,而不是建议我们尝试删除 working_jobs也。 (因为删除工作作业比简单地删除 redis 键条目更复杂)
这里很安静,没有任何回应,所以我设法按照我在问题中指出的路径自己解决了这一天。可能有更好的解决方案或其他方法可用,但到目前为止,这似乎已经完成了工作。如果这里使用的方法有更好的选择或者是否可以进一步改进,请随时发表评论。
这里的总体方法是您需要搜索所有作业(排队、工作、失败)并仅过滤掉 class
和 queue
相关且与对象匹配的作业在 args 数组的正确索引位置中记录您要查找的 ID。例如(在确认 class
和 queue
匹配后)如果参数位置 0 是对象 id 所在的位置,那么您可以测试看 args[0]
是否匹配对象 id.
本质上,如果满足以下条件,则作业与对象 ID 相关联:job_class == class.name && job_queue == @queue && job_args[OBJECT_ID_ARGS_INDEX].to_i == object_id
- 排队作业:要查找所有排队作业,您需要收集所有 redis
带有名为
queue:#{@queue}
的键的条目,其中 @queue 是
您的工作人员 class 正在使用的队列名称。相应修改
如果您使用多个队列,则循环多个队列
一个特定的工人 class。 Resque.redis.lrange("queue:#{@queue}",0,-1)
- 失败的作业:要查找所有排队的作业,您需要收集所有 redis
键名为
failed
的条目(除非您使用多个
故障队列或默认设置以外的一些)。 Resque.redis.lrange("failed",0,-1)
- 工作职位: 要查找所有工作职位,您可以使用
Resque.workers
其中包含所有工人的数组和 运行 的工作。 Resque.workers.map(&:job)
- Job: 上述每个列表中的每个作业都将是一个编码哈希。你
可以使用
Resque.decode(job)
将作业解码为 ruby 散列。
- Class 和 args: 对于 排队作业 ,
class
和 args
键是 job["class"]
和 job["args"]
。对于失败和工作,这些是job["payload"]["class"]
和job["payload"]["args"]
。
- 队列: 对于找到的每个 失败 和 工作作业,队列将是
job["queue"]
。在测试对象 ID 的 args 列表之前,您只需要匹配 class
和 queue
的作业。您的排队作业 列表已经限制为您收集的队列。
下面是示例工作人员 class 和模型方法,用于查找(和删除)与示例模型对象(报告)关联的工作。
resque/redis 支持的作业的报告处理工作人员class
class ProcessReportWorker
# queue name
@queue = :report_processing
# tell the worker class where the report id is in the arguments list
REPORT_ID_ARGS_INDEX = 0
# <snip> rest of class, not needed here for this answer
# find jobs methods - find by report id (report is the 'associated' object)
def self.find_queued_jobs_by_report_id report_id
queued_jobs(@queue).select do |job|
is_job_for_report? :queued, job, report_id
end
end
def self.find_failed_jobs_by_report_id report_id
failed_jobs.select do |job|
is_job_for_report? :failed, job, report_id
end
end
def self.find_working_jobs_by_report_id report_id
working_jobs.select do |worker,job|
is_job_for_report? :working, job, report_id
end
end
# association test method - determine if this job is associated
def self.is_job_for_report? state, job, report_id
attributes = job_attributes(state, job)
attributes[:klass] == self.name &&
attributes[:queue] == @queue &&
attributes[:args][REPORT_ID_ARGS_INDEX].to_i == report_id
end
# remove jobs methods
def self.remove_failed_jobs_by_report_id report_id
find_failed_jobs_by_report_id(report_id).each do |job|
Resque::Failure.remove(job["index"])
end
end
def self.remove_queued_jobs_by_report_id report_id
find_queued_jobs_by_report_id(report_id).each do |job|
Resque::Job.destroy(@queue,job["class"],*job["args"])
end
end
# reusable methods - these methods could go elsewhere and be reusable across worker classes
# job attributes method
def self.job_attributes(state, job)
if state == :queued && job["args"].present?
args = job["args"]
klass = job["class"]
elsif job["payload"] && job["payload"]["args"].present?
args = job["payload"]["args"]
klass = job["payload"]["class"]
else
return {args: nil, klass: nil, queue: nil}
end
{args: args, klass: klass, queue: job["queue"]}
end
# jobs list methods
def self.queued_jobs queue
Resque.redis.lrange("queue:#{queue}", 0, -1)
.collect do |job|
job = Resque.decode(job)
job["queue"] = queue # for consistency only
job
end
end
def self.failed_jobs
Resque.redis.lrange("failed", 0, -1)
.each_with_index.collect do |job,index|
job = Resque.decode(job)
job["index"] = index # required if removing
job
end
end
def self.working_jobs
Resque.workers.zip(Resque.workers.map(&:job))
.reject { |w, j| w.idle? || j['queue'].nil? }
end
end
那么报表模型的使用示例就变成了
class Report < ActiveRecord::Base
before_destroy :check_if_working_jobs, :remove_queued_and_failed_jobs
def check_if_working_jobs
# find all working jobs related to this report object
working_jobs = ProcessReportWorker.find_working_jobs_by_report_id(self.id)
return false unless working_jobs.empty?
end
def remove_queued_and_failed_jobs
# find all jobs related to this report object
queued_jobs = ProcessReportWorker.find_queued_jobs_by_report_id(self.id)
failed_jobs = ProcessReportWorker.find_failed_jobs_by_report_id(self.id)
# extra code and conditionals here for example only as all that is really
# needed is to call the remove methods without first finding or checking
unless queued_jobs.empty?
ProcessReportWorker.remove_queued_jobs_by_report_id(self.id)
end
unless failed_jobs.empty?
ProcessReportWorker.remove_failed_jobs_by_report_id(self.id)
end
end
end
如果您为 worker class 使用多个队列,或者如果您有多个失败队列,则需要修改解决方案。此外,还使用了 redis
故障后端。如果使用不同的故障后端,则可能需要进行更改。
我需要能够为模型对象找到排队的 and/or 个工作作业 and/or 个失败的作业,例如当模型对象被销毁时我们想要找到所有并且要么决定不删除或销毁作业(有条件地)。
在我重新发明轮子之前,有推荐的方法吗?
示例:
如果您想创建一个 before_destroy
回调,在销毁对象(排队和失败的作业)时销毁所有作业,并且仅在没有工作作业时才销毁
我想为这个示例用例做的一些伪代码:
报表模型
class Report < ActiveRecord::Base
before_destroy :check_if_working_jobs, :destroy_queued_and_failed_jobs
def check_if_working_jobs
# find all working jobs related to this report object
working_jobs = ProcessReportWorker.find_working_jobs_by_report_id(self.id)
return false unless working_jobs.empty?
end
def destroy_queued_and_failed_jobs
# find all jobs related to this report object
queued_jobs = ProcessReportWorker.find_queued_jobs_by_report_id(self.id)
failed_jobs = ProcessReportWorker.find_failed_jobs_by_report_id(self.id)
# destroy/remove all jobs found
(queued_jobs + failed_jobs).each do |job|
# destroy the job here ... commands?
end
end
end
resque/redis 支持的作业的报告处理工作人员class
class ProcessReportWorker
# find the jobs by report id which is one of the arguments for the job?
# envisioned as separate methods so they can be used independently as needed
def self.find_queued_jobs_by_report_id(id)
# parse all jobs in all queues to find based on the report id argument?
end
def self.find_working_jobs_by_report_id(id)
# parse all jobs in working queues to find based on the report id argument?
end
def self.find_failed_jobs_by_report_id(id)
# parse all jobs in failed queue to find based on the report id argument?
end
end
这种方法是否符合需要发生的事情?
通过模型对象 id 找到排队或工作的作业然后销毁上面缺少的部分是什么?
是否已经有方法可以找到 and/or destroy by 我在文档或搜索中遗漏的关联模型对象 ID?
更新: 修改了用法示例,仅使用 working_jobs 作为检查我们是否应该删除的方式,而不是建议我们尝试删除 working_jobs也。 (因为删除工作作业比简单地删除 redis 键条目更复杂)
这里很安静,没有任何回应,所以我设法按照我在问题中指出的路径自己解决了这一天。可能有更好的解决方案或其他方法可用,但到目前为止,这似乎已经完成了工作。如果这里使用的方法有更好的选择或者是否可以进一步改进,请随时发表评论。
这里的总体方法是您需要搜索所有作业(排队、工作、失败)并仅过滤掉 class
和 queue
相关且与对象匹配的作业在 args 数组的正确索引位置中记录您要查找的 ID。例如(在确认 class
和 queue
匹配后)如果参数位置 0 是对象 id 所在的位置,那么您可以测试看 args[0]
是否匹配对象 id.
本质上,如果满足以下条件,则作业与对象 ID 相关联:job_class == class.name && job_queue == @queue && job_args[OBJECT_ID_ARGS_INDEX].to_i == object_id
- 排队作业:要查找所有排队作业,您需要收集所有 redis
带有名为
queue:#{@queue}
的键的条目,其中 @queue 是 您的工作人员 class 正在使用的队列名称。相应修改 如果您使用多个队列,则循环多个队列 一个特定的工人 class。Resque.redis.lrange("queue:#{@queue}",0,-1)
- 失败的作业:要查找所有排队的作业,您需要收集所有 redis
键名为
failed
的条目(除非您使用多个 故障队列或默认设置以外的一些)。Resque.redis.lrange("failed",0,-1)
- 工作职位: 要查找所有工作职位,您可以使用
Resque.workers
其中包含所有工人的数组和 运行 的工作。Resque.workers.map(&:job)
- Job: 上述每个列表中的每个作业都将是一个编码哈希。你
可以使用
Resque.decode(job)
将作业解码为 ruby 散列。 - Class 和 args: 对于 排队作业 ,
class
和args
键是job["class"]
和job["args"]
。对于失败和工作,这些是job["payload"]["class"]
和job["payload"]["args"]
。 - 队列: 对于找到的每个 失败 和 工作作业,队列将是
job["queue"]
。在测试对象 ID 的 args 列表之前,您只需要匹配class
和queue
的作业。您的排队作业 列表已经限制为您收集的队列。
下面是示例工作人员 class 和模型方法,用于查找(和删除)与示例模型对象(报告)关联的工作。
resque/redis 支持的作业的报告处理工作人员class
class ProcessReportWorker
# queue name
@queue = :report_processing
# tell the worker class where the report id is in the arguments list
REPORT_ID_ARGS_INDEX = 0
# <snip> rest of class, not needed here for this answer
# find jobs methods - find by report id (report is the 'associated' object)
def self.find_queued_jobs_by_report_id report_id
queued_jobs(@queue).select do |job|
is_job_for_report? :queued, job, report_id
end
end
def self.find_failed_jobs_by_report_id report_id
failed_jobs.select do |job|
is_job_for_report? :failed, job, report_id
end
end
def self.find_working_jobs_by_report_id report_id
working_jobs.select do |worker,job|
is_job_for_report? :working, job, report_id
end
end
# association test method - determine if this job is associated
def self.is_job_for_report? state, job, report_id
attributes = job_attributes(state, job)
attributes[:klass] == self.name &&
attributes[:queue] == @queue &&
attributes[:args][REPORT_ID_ARGS_INDEX].to_i == report_id
end
# remove jobs methods
def self.remove_failed_jobs_by_report_id report_id
find_failed_jobs_by_report_id(report_id).each do |job|
Resque::Failure.remove(job["index"])
end
end
def self.remove_queued_jobs_by_report_id report_id
find_queued_jobs_by_report_id(report_id).each do |job|
Resque::Job.destroy(@queue,job["class"],*job["args"])
end
end
# reusable methods - these methods could go elsewhere and be reusable across worker classes
# job attributes method
def self.job_attributes(state, job)
if state == :queued && job["args"].present?
args = job["args"]
klass = job["class"]
elsif job["payload"] && job["payload"]["args"].present?
args = job["payload"]["args"]
klass = job["payload"]["class"]
else
return {args: nil, klass: nil, queue: nil}
end
{args: args, klass: klass, queue: job["queue"]}
end
# jobs list methods
def self.queued_jobs queue
Resque.redis.lrange("queue:#{queue}", 0, -1)
.collect do |job|
job = Resque.decode(job)
job["queue"] = queue # for consistency only
job
end
end
def self.failed_jobs
Resque.redis.lrange("failed", 0, -1)
.each_with_index.collect do |job,index|
job = Resque.decode(job)
job["index"] = index # required if removing
job
end
end
def self.working_jobs
Resque.workers.zip(Resque.workers.map(&:job))
.reject { |w, j| w.idle? || j['queue'].nil? }
end
end
那么报表模型的使用示例就变成了
class Report < ActiveRecord::Base
before_destroy :check_if_working_jobs, :remove_queued_and_failed_jobs
def check_if_working_jobs
# find all working jobs related to this report object
working_jobs = ProcessReportWorker.find_working_jobs_by_report_id(self.id)
return false unless working_jobs.empty?
end
def remove_queued_and_failed_jobs
# find all jobs related to this report object
queued_jobs = ProcessReportWorker.find_queued_jobs_by_report_id(self.id)
failed_jobs = ProcessReportWorker.find_failed_jobs_by_report_id(self.id)
# extra code and conditionals here for example only as all that is really
# needed is to call the remove methods without first finding or checking
unless queued_jobs.empty?
ProcessReportWorker.remove_queued_jobs_by_report_id(self.id)
end
unless failed_jobs.empty?
ProcessReportWorker.remove_failed_jobs_by_report_id(self.id)
end
end
end
如果您为 worker class 使用多个队列,或者如果您有多个失败队列,则需要修改解决方案。此外,还使用了 redis
故障后端。如果使用不同的故障后端,则可能需要进行更改。