Azure:具有唯一键的循环(环形)队列
Azure: circular (ring) queue with unique key
对于我的应用程序,我需要组织一个循环(环形)队列。这意味着任何已处理的消息都会立即进入队列的末尾以进行连续处理。
例如:
- 队列:
A
、B
、C
。
- 接收方进程
A
。
- 队列:
B
、C
、A
。
2 和 3 应该自动执行。所以我们永远不会丢失 A
或任何其他消息。
另一个要求是忽略重复项。所以队列中应该总是有一个A
。即使发件人推送另一个 A
项目。 A
指的是此处消息的一些唯一(主)键。
我寻找使用 Azure 服务总线的方法,但找不到如何用它来满足这两个要求。是否可以使用服务总线实现该场景?如果不是,最好的选择是什么?
基于我对 Azure 服务总线的了解,我相信这两个要求都可以单独满足,尽管我不确定如何同时满足它们。
Message Cycling
据我了解,Azure 服务总线支持 First-In-First-Out (FIFO)
行为。您可以做的是以 Receive and Delete
模式从队列顶部获取消息(比如 A),然后将消息重新插入队列中。由于您正在创建新消息,它将被发布到队列的末尾。
Avoid Duplicate Messages
服务总线队列有一个名为 RequiresDuplicateDetection
的布尔值 属性,相应地设置此值将防止插入重复的消息。只需搜索 Azure Service Bus Duplicate Detection
即可找到许多示例。
这种队列可以用Service Bus sessions来实现。会话提供 "group by" 机制,因此我们可以将我们的唯一密钥分配给消息的 SessionId
,然后分组接收消息,忽略组中除第一个消息之外的所有消息。
实施
1) 创建一个 RequiresSession
设置为 true 的队列:
var queueDescription = new QueueDescription("CircularQueue")
{
RequiresSession = true,
};
await namespaceManager.CreateQueueAsync(queueDescription);
2) 向队列发送消息时,将SessionId
设置为您的唯一键值:
var message = new BrokeredMessage($"Message body")
{
MessageId = "MESSAGE_UNIQUE_KEY",
SessionId = "MESSAGE_UNIQUE_KEY"
};
await queueClient.SendAsync(message);
3) 使用会话接收消息:
while (true)
{
var session = await queueClient.AcceptMessageSessionAsync(TimeSpan.FromSeconds(10));
if (session == null)
continue;
try
{
var messages = (await session.ReceiveBatchAsync(100)).ToList();
if (messages.Count == 0)
continue;
var message = messages[0];
ProcessMessage(message);
await queueClient.SendAsync(message.Clone());
await session.CompleteBatchAsync(messages.Select(msg => msg.LockToken));
}
finally
{
await session.CloseAsync();
}
}
对于我的应用程序,我需要组织一个循环(环形)队列。这意味着任何已处理的消息都会立即进入队列的末尾以进行连续处理。
例如:
- 队列:
A
、B
、C
。 - 接收方进程
A
。 - 队列:
B
、C
、A
。
2 和 3 应该自动执行。所以我们永远不会丢失 A
或任何其他消息。
另一个要求是忽略重复项。所以队列中应该总是有一个A
。即使发件人推送另一个 A
项目。 A
指的是此处消息的一些唯一(主)键。
我寻找使用 Azure 服务总线的方法,但找不到如何用它来满足这两个要求。是否可以使用服务总线实现该场景?如果不是,最好的选择是什么?
基于我对 Azure 服务总线的了解,我相信这两个要求都可以单独满足,尽管我不确定如何同时满足它们。
Message Cycling
据我了解,Azure 服务总线支持 First-In-First-Out (FIFO)
行为。您可以做的是以 Receive and Delete
模式从队列顶部获取消息(比如 A),然后将消息重新插入队列中。由于您正在创建新消息,它将被发布到队列的末尾。
Avoid Duplicate Messages
服务总线队列有一个名为 RequiresDuplicateDetection
的布尔值 属性,相应地设置此值将防止插入重复的消息。只需搜索 Azure Service Bus Duplicate Detection
即可找到许多示例。
这种队列可以用Service Bus sessions来实现。会话提供 "group by" 机制,因此我们可以将我们的唯一密钥分配给消息的 SessionId
,然后分组接收消息,忽略组中除第一个消息之外的所有消息。
实施
1) 创建一个 RequiresSession
设置为 true 的队列:
var queueDescription = new QueueDescription("CircularQueue")
{
RequiresSession = true,
};
await namespaceManager.CreateQueueAsync(queueDescription);
2) 向队列发送消息时,将SessionId
设置为您的唯一键值:
var message = new BrokeredMessage($"Message body")
{
MessageId = "MESSAGE_UNIQUE_KEY",
SessionId = "MESSAGE_UNIQUE_KEY"
};
await queueClient.SendAsync(message);
3) 使用会话接收消息:
while (true)
{
var session = await queueClient.AcceptMessageSessionAsync(TimeSpan.FromSeconds(10));
if (session == null)
continue;
try
{
var messages = (await session.ReceiveBatchAsync(100)).ToList();
if (messages.Count == 0)
continue;
var message = messages[0];
ProcessMessage(message);
await queueClient.SendAsync(message.Clone());
await session.CompleteBatchAsync(messages.Select(msg => msg.LockToken));
}
finally
{
await session.CloseAsync();
}
}