如何停止集群中调度程序作业的重复执行
How to stop duplicate execution of scheduler jobs in cluster
我们有一个使用计划作业的节点应用程序,我的计划作业一天工作 6 次。
现在我们要将节点应用程序升级为基于集群的,我的意思是主节点和工作节点。
通过这样做,我看到作业被复制并执行了多次。
- 我必须删除重复执行
- 如果 master 未能 运行 作业或 master 应用停止,worker 应该执行
工作
注意:我们使用 sql 服务器作为数据库。
第一
SQL 服务器是实现此功能的最差软件,尤其是当它非常动态时。原因是像 MEMORY 和 MyISAM 这样的引擎只有 full-table 锁,而像 InnoDB 这样的更多 suitable 引擎有更高的写惩罚(以提供 ACID 属性)并且针对访问空间记录进行了优化并且暂时关闭,这与您提供的场景不同。
SQL服务器的解决方案
但是,如果你坚持使用SQL服务器。这是解决方案-
在关系数据库系统中实现作业队列的最佳方式是使用SKIP LOCK。调度可以归类为图问题,其中 SQL table 中的每个事务都可以被视为一个节点,并且您只能在 DAG fashion 中访问这些节点一次。要解决此问题,您需要将 SKIP LOCKS 与作业队列架构一起使用
QueueMsgId identity -- NOT NULL
QueueMsgType varchar(20) -- NOT NULL
QueueState char(1) -- 'N' New if queued, 'A' Active if processing, 'C' Completed, default 'N' -- NOT NULL
CreateTime datetime -- default GETDATE() -- NOT NULL
QueueMsg varchar(255) -- NULLable
QueueClient 提取一条消息,并在处理消息时将状态更改为活动。完成后,它会将状态更改为 Complete。 SKIP LOCKED 是一种锁定机制,它获取对 read/share(用于共享)或 write/exclusive(用于更新)的锁定。
如果我们有多个并发用户试图访问队列记录,那么我们要确保没有人可以在我们读取数据后更改数据,我们可以使用repeatable read。通过使用它,我们所做的任何 tables 的任何读取都将被锁定以进行更新或删除。执行以下 SELECT 查询,这些查询专门锁定队列记录,同时还为并发添加 SKIP LOCKED 选项:
SELECT
q.QueueMsgId AS id1,
q.QueueMsgType AS msgType1,
q.QueueState AS state1,
FROM
Queue q
WHERE
q.QueueState = 'A'
ORDER BY
q.QueueMsgId
LIMIT 2
FOR UPDATE OF q SKIP LOCKED
对于多个用户,执行相同的查询,Queue记录不会有冲突
最佳解决方案
DUMP SQL 并转移到 REDIS 以获得每秒高查询吞吐量。虽然 Redis 的数据结构是并发的,但很难处理。幸运的是,我们在 node.js - BULL, for cluster-based deployment check this example.
中为 Job/Message 队列系统提供了开箱即用的解决方案
我们有一个使用计划作业的节点应用程序,我的计划作业一天工作 6 次。 现在我们要将节点应用程序升级为基于集群的,我的意思是主节点和工作节点。 通过这样做,我看到作业被复制并执行了多次。
- 我必须删除重复执行
- 如果 master 未能 运行 作业或 master 应用停止,worker 应该执行 工作
注意:我们使用 sql 服务器作为数据库。
第一
SQL 服务器是实现此功能的最差软件,尤其是当它非常动态时。原因是像 MEMORY 和 MyISAM 这样的引擎只有 full-table 锁,而像 InnoDB 这样的更多 suitable 引擎有更高的写惩罚(以提供 ACID 属性)并且针对访问空间记录进行了优化并且暂时关闭,这与您提供的场景不同。
SQL服务器的解决方案
但是,如果你坚持使用SQL服务器。这是解决方案-
在关系数据库系统中实现作业队列的最佳方式是使用SKIP LOCK。调度可以归类为图问题,其中 SQL table 中的每个事务都可以被视为一个节点,并且您只能在 DAG fashion 中访问这些节点一次。要解决此问题,您需要将 SKIP LOCKS 与作业队列架构一起使用
QueueMsgId identity -- NOT NULL
QueueMsgType varchar(20) -- NOT NULL
QueueState char(1) -- 'N' New if queued, 'A' Active if processing, 'C' Completed, default 'N' -- NOT NULL
CreateTime datetime -- default GETDATE() -- NOT NULL
QueueMsg varchar(255) -- NULLable
QueueClient 提取一条消息,并在处理消息时将状态更改为活动。完成后,它会将状态更改为 Complete。 SKIP LOCKED 是一种锁定机制,它获取对 read/share(用于共享)或 write/exclusive(用于更新)的锁定。
如果我们有多个并发用户试图访问队列记录,那么我们要确保没有人可以在我们读取数据后更改数据,我们可以使用repeatable read。通过使用它,我们所做的任何 tables 的任何读取都将被锁定以进行更新或删除。执行以下 SELECT 查询,这些查询专门锁定队列记录,同时还为并发添加 SKIP LOCKED 选项:
SELECT
q.QueueMsgId AS id1,
q.QueueMsgType AS msgType1,
q.QueueState AS state1,
FROM
Queue q
WHERE
q.QueueState = 'A'
ORDER BY
q.QueueMsgId
LIMIT 2
FOR UPDATE OF q SKIP LOCKED
对于多个用户,执行相同的查询,Queue记录不会有冲突
最佳解决方案
DUMP SQL 并转移到 REDIS 以获得每秒高查询吞吐量。虽然 Redis 的数据结构是并发的,但很难处理。幸运的是,我们在 node.js - BULL, for cluster-based deployment check this example.
中为 Job/Message 队列系统提供了开箱即用的解决方案