如何基于kubernetes集群约束delayed_job处理
How to constrain delayed_job processing based on kubernetes cluster
我正在寻找一种方法来隔离我的审阅环境处理哪些作业。
我们正在使用 delayed_job 并且 运行 一些基于主集群的 kubernetes 别名集群。
这可能吗?我找到了一种简单地为工人姓名添加前缀的方法,但我找不到将其传递给实际工作的方法。
感谢任何帮助。
我认为它应该工作的方式 is something like this。
我不确定这是否是正确的方法,也许使用生命周期事件可以达到同样的效果?我只是添加一列并使用生命周期事件来添加数据并查询它?
最终,我得到了以下解决方案。将名为 cluster 的 varchar 列添加到 delayed_jobs table 和 BOOM。很有魅力。
require 'delayed/backend/active_record'
module Delayed
module Backend
module ActiveRecord
class Configuration
attr_accessor :cluster
end
# A job object that is persisted to the database.
# Contains the work object as a YAML field.
class Job < ::ActiveRecord::Base
READY_SQL = <<~SQL.squish.freeze
((cluster = ? AND run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL
SQL
before_save :set_cluster
def self.ready_to_run(worker_name, max_run_time)
where(READY_SQL, cluster, db_time_now, db_time_now - max_run_time, worker_name)
end
# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)
where(cluster: cluster, locked_by: worker_name)
.update_all(locked_by: nil, locked_at: nil) # rubocop:disable Rails/SkipsModelValidations
end
def self.cluster
Delayed::Backend::ActiveRecord.configuration.cluster
end
def set_cluster
self.cluster ||= self.class.cluster
end
end
end
end
end
Delayed::Backend::ActiveRecord.configuration.cluster = ENV['CLUSTER'] if ENV['CLUSTER']
我正在寻找一种方法来隔离我的审阅环境处理哪些作业。
我们正在使用 delayed_job 并且 运行 一些基于主集群的 kubernetes 别名集群。
这可能吗?我找到了一种简单地为工人姓名添加前缀的方法,但我找不到将其传递给实际工作的方法。
感谢任何帮助。
我认为它应该工作的方式 is something like this。
我不确定这是否是正确的方法,也许使用生命周期事件可以达到同样的效果?我只是添加一列并使用生命周期事件来添加数据并查询它?
最终,我得到了以下解决方案。将名为 cluster 的 varchar 列添加到 delayed_jobs table 和 BOOM。很有魅力。
require 'delayed/backend/active_record'
module Delayed
module Backend
module ActiveRecord
class Configuration
attr_accessor :cluster
end
# A job object that is persisted to the database.
# Contains the work object as a YAML field.
class Job < ::ActiveRecord::Base
READY_SQL = <<~SQL.squish.freeze
((cluster = ? AND run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL
SQL
before_save :set_cluster
def self.ready_to_run(worker_name, max_run_time)
where(READY_SQL, cluster, db_time_now, db_time_now - max_run_time, worker_name)
end
# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)
where(cluster: cluster, locked_by: worker_name)
.update_all(locked_by: nil, locked_at: nil) # rubocop:disable Rails/SkipsModelValidations
end
def self.cluster
Delayed::Backend::ActiveRecord.configuration.cluster
end
def set_cluster
self.cluster ||= self.class.cluster
end
end
end
end
end
Delayed::Backend::ActiveRecord.configuration.cluster = ENV['CLUSTER'] if ENV['CLUSTER']