防止两个连接读取同一行
Prevent two connections from reading same row
我正在考虑构建一个基于数据库的 Message Queue 实现。我基本上会有一个数据库 table,它将包含一个自动生成的 ID (bigint)、一个消息 ID 和消息数据。我将编写一个基于拉取的消费者,它将从 table 中查询最旧的记录 (min(id)) 并将其移交处理。
现在我的疑问是,当有多个消费者线程时,我将如何处理最旧记录的查询。如何将第一个读取记录锁定给第一个消费者并且基本上不让下一个消费者看到它。
我的一个想法是添加另一个名为 locked by 我将存储的列,假设线程名称和 select 更新记录并立即更新 locked by column 然后继续处理它.这样我就不会在下一个查询中 select 锁定列。
这是可行的解决方案吗?
编辑:
本质上,这就是我想要的。
- 连接一查询数据库 table 一行。读取第一行并在读取更新时将其锁定。
- 连接二查询数据库 table 一行。应该无法读取第一行,应该读取第二行(如果可用)并将其锁定以进行更新。
- 连接 3、4 等的类似逻辑。
- 连接一使用其标识符更新记录。处理它并随后删除记录。
- Connection one queries the database table for a row. Reads first row and locks it while reading for update.
- Connection two queries the database table for a row. Should not be able to read the first row, should read second row if available and
lock it for update.
- Similar logic for connection 3, 4, etc..
- Connection one updates the record with its identifier. Processes it and subsequently deletes the record.
TL;DR,参见 Rusanu 的 using tables as queues。下面的示例 DDL 是从文章中收集的。
CREATE TABLE dbo.FifoQueueTable (
Id bigint not null identity(1,1)
CONSTRAINT pk_FifoQueue PRIMARY KEY CLUSTERED
,Payload varbinary(MAX)
);
GO
CREATE PROCEDURE dbo.usp_EnqueueFifoTableMessage
@payload varbinary(MAX)
AS
SET NOCOUNT ON;
INSERT INTO dbo.FifoQueueTable (Payload) VALUES (@Payload);
GO
CREATE PROCEDURE dbo.usp_DequeueFifoTableMessage
AS
SET NOCOUNT ON;
WITH cte AS (
SELECT TOP(1) Payload
FROM dbo.FifoQueueTable WITH (ROWLOCK, READPAST)
ORDER BY Id
)
DELETE FROM cte
OUTPUT deleted.Payload;
GO
这个实现很简单,但处理不愉快的路径可能很复杂,具体取决于消息的性质和错误原因。
接受消息丢失table时,可以简单地使用默认的自动提交事务和日志错误。
在不能丢失消息的情况下,出列必须在客户端启动的事务中完成,并且仅在成功处理或未读取消息后才提交。如果应用程序或数据库服务崩溃,事务还将确保消息不会丢失。可靠的错误处理策略取决于错误类型、消息的性质和消息处理顺序的影响。
一条有毒消息(即负载中的错误阻止消息成功发送),可以将错误消息插入死信 table 以供后续手动审查并提交到事务。
暂时性错误,例如调用外部服务失败,可以使用以下技术来处理:
- 回滚事务,使消息在 FIFO 队列中排在第一位,以便在下一次迭代中重试。
- 重新排队错误的消息并提交,以便消息在 FIFO 队列中排在最后进行重试。
- 将错误消息与重试计数一起放入单独的重试队列中。达到重试限制后,可以将消息插入死信 table。
应用程序代码还可以在消息处理期间包含重试逻辑,但应避免长时间 运行 数据库事务并在达到一定重试阈值后回退到上述一种技术。
可以使用 Service Broker 实现这些相同的概念,以促进仅 T-SQL 的解决方案(内部激活),但在不需要时(如您的情况)会增加复杂性。请注意,SB 队列本质上实现了“READPAST”要求,但由于同一对话组中的所有消息都被锁定,这意味着每条消息都需要在单独的对话中。
我正在考虑构建一个基于数据库的 Message Queue 实现。我基本上会有一个数据库 table,它将包含一个自动生成的 ID (bigint)、一个消息 ID 和消息数据。我将编写一个基于拉取的消费者,它将从 table 中查询最旧的记录 (min(id)) 并将其移交处理。
现在我的疑问是,当有多个消费者线程时,我将如何处理最旧记录的查询。如何将第一个读取记录锁定给第一个消费者并且基本上不让下一个消费者看到它。
我的一个想法是添加另一个名为 locked by 我将存储的列,假设线程名称和 select 更新记录并立即更新 locked by column 然后继续处理它.这样我就不会在下一个查询中 select 锁定列。
这是可行的解决方案吗?
编辑: 本质上,这就是我想要的。
- 连接一查询数据库 table 一行。读取第一行并在读取更新时将其锁定。
- 连接二查询数据库 table 一行。应该无法读取第一行,应该读取第二行(如果可用)并将其锁定以进行更新。
- 连接 3、4 等的类似逻辑。
- 连接一使用其标识符更新记录。处理它并随后删除记录。
- Connection one queries the database table for a row. Reads first row and locks it while reading for update.
- Connection two queries the database table for a row. Should not be able to read the first row, should read second row if available and lock it for update.
- Similar logic for connection 3, 4, etc..
- Connection one updates the record with its identifier. Processes it and subsequently deletes the record.
TL;DR,参见 Rusanu 的 using tables as queues。下面的示例 DDL 是从文章中收集的。
CREATE TABLE dbo.FifoQueueTable (
Id bigint not null identity(1,1)
CONSTRAINT pk_FifoQueue PRIMARY KEY CLUSTERED
,Payload varbinary(MAX)
);
GO
CREATE PROCEDURE dbo.usp_EnqueueFifoTableMessage
@payload varbinary(MAX)
AS
SET NOCOUNT ON;
INSERT INTO dbo.FifoQueueTable (Payload) VALUES (@Payload);
GO
CREATE PROCEDURE dbo.usp_DequeueFifoTableMessage
AS
SET NOCOUNT ON;
WITH cte AS (
SELECT TOP(1) Payload
FROM dbo.FifoQueueTable WITH (ROWLOCK, READPAST)
ORDER BY Id
)
DELETE FROM cte
OUTPUT deleted.Payload;
GO
这个实现很简单,但处理不愉快的路径可能很复杂,具体取决于消息的性质和错误原因。
接受消息丢失table时,可以简单地使用默认的自动提交事务和日志错误。
在不能丢失消息的情况下,出列必须在客户端启动的事务中完成,并且仅在成功处理或未读取消息后才提交。如果应用程序或数据库服务崩溃,事务还将确保消息不会丢失。可靠的错误处理策略取决于错误类型、消息的性质和消息处理顺序的影响。
一条有毒消息(即负载中的错误阻止消息成功发送),可以将错误消息插入死信 table 以供后续手动审查并提交到事务。
暂时性错误,例如调用外部服务失败,可以使用以下技术来处理:
- 回滚事务,使消息在 FIFO 队列中排在第一位,以便在下一次迭代中重试。
- 重新排队错误的消息并提交,以便消息在 FIFO 队列中排在最后进行重试。
- 将错误消息与重试计数一起放入单独的重试队列中。达到重试限制后,可以将消息插入死信 table。
应用程序代码还可以在消息处理期间包含重试逻辑,但应避免长时间 运行 数据库事务并在达到一定重试阈值后回退到上述一种技术。
可以使用 Service Broker 实现这些相同的概念,以促进仅 T-SQL 的解决方案(内部激活),但在不需要时(如您的情况)会增加复杂性。请注意,SB 队列本质上实现了“READPAST”要求,但由于同一对话组中的所有消息都被锁定,这意味着每条消息都需要在单独的对话中。