Microsoft Azure 服务总线队列作为 FIFO 工作
Microsoft Azure Service Bus Queue working as FIFO
我正在为 Azure 开发两个 WebJobs:一个使用主题将消息放入服务总线队列,另一个使用相同主题订阅 ServiceBusTrigger。
消息被正确发送到服务总线队列,但是当 运行 WebJob 订阅 ServiceBusTrigger 时,这些消息未按 FIFO 方式处理。
将消息放入服务总线队列的 WebJob 代码如下:
NamespaceManager namespaceManager = NamespaceManager.Create();
// Delete if exists
if (namespaceManager.TopicExists("SampleTopic"))
{
namespaceManager.DeleteTopic("SampleTopic");
}
TopicDescription td = new TopicDescription("SampleTopic");
td.SupportOrdering = true;
TopicDescription myTopic = namespaceManager.CreateTopic(td);
SubscriptionDescription myAuditSubscription = namespaceManager.CreateSubscription(myTopic.Path, "ImporterSubscription");
TopicClient topicClient = TopicClient.Create("SampleTopic");
for(int i = 1; i <= 10; i++)
{
var message = new BrokeredMessage("message"+i);
topicClient.Send(message);
}
topicClient.Close();
服务总线触发器订阅的WebJob有如下代码:
namespace HO.Importer.Azure.WebJob.TGZProcessor
{
public class Program
{
static void Main(string[] args)
{
JobHostConfiguration config = new JobHostConfiguration();
config.UseServiceBus();
JobHost host = new JobHost(config);
host.RunAndBlock();
}
public static void WriteLog([ServiceBusTrigger("SampleTopic", "ImporterSubscription")] string message,
TextWriter logger)
{
Console.WriteLine(message));
}
}
}
如何实现以先进先出的方式处理来自队列的消息?
提前致谢!
虽然 Azure 服务总线提供 FIFO 功能(会话),但最好不要使用基于代理的排队系统来假设这种行为。 Ben Morris 有一个很好的 post Don’t assume message ordering in Azure Service Bus 假设使用异步消息进行排序几乎是一个谬论及其原因。
使用 SessionId 或 PartitionKey,这将确保消息由同一个消息代理处理。
参见:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-partitioning
"SessionId: If a message has the BrokeredMessage.SessionId property set, then Service Bus uses this property as the partition key. This way, all messages that belong to the same session are handled by the same message broker. This enables Service Bus to guarantee message ordering as well as the consistency of session states."
我正在为 Azure 开发两个 WebJobs:一个使用主题将消息放入服务总线队列,另一个使用相同主题订阅 ServiceBusTrigger。
消息被正确发送到服务总线队列,但是当 运行 WebJob 订阅 ServiceBusTrigger 时,这些消息未按 FIFO 方式处理。
将消息放入服务总线队列的 WebJob 代码如下:
NamespaceManager namespaceManager = NamespaceManager.Create();
// Delete if exists
if (namespaceManager.TopicExists("SampleTopic"))
{
namespaceManager.DeleteTopic("SampleTopic");
}
TopicDescription td = new TopicDescription("SampleTopic");
td.SupportOrdering = true;
TopicDescription myTopic = namespaceManager.CreateTopic(td);
SubscriptionDescription myAuditSubscription = namespaceManager.CreateSubscription(myTopic.Path, "ImporterSubscription");
TopicClient topicClient = TopicClient.Create("SampleTopic");
for(int i = 1; i <= 10; i++)
{
var message = new BrokeredMessage("message"+i);
topicClient.Send(message);
}
topicClient.Close();
服务总线触发器订阅的WebJob有如下代码:
namespace HO.Importer.Azure.WebJob.TGZProcessor
{
public class Program
{
static void Main(string[] args)
{
JobHostConfiguration config = new JobHostConfiguration();
config.UseServiceBus();
JobHost host = new JobHost(config);
host.RunAndBlock();
}
public static void WriteLog([ServiceBusTrigger("SampleTopic", "ImporterSubscription")] string message,
TextWriter logger)
{
Console.WriteLine(message));
}
}
}
如何实现以先进先出的方式处理来自队列的消息?
提前致谢!
虽然 Azure 服务总线提供 FIFO 功能(会话),但最好不要使用基于代理的排队系统来假设这种行为。 Ben Morris 有一个很好的 post Don’t assume message ordering in Azure Service Bus 假设使用异步消息进行排序几乎是一个谬论及其原因。
使用 SessionId 或 PartitionKey,这将确保消息由同一个消息代理处理。
参见:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-partitioning
"SessionId: If a message has the BrokeredMessage.SessionId property set, then Service Bus uses this property as the partition key. This way, all messages that belong to the same session are handled by the same message broker. This enables Service Bus to guarantee message ordering as well as the consistency of session states."