Azure Service Bus SubscriptionClient 高延迟/不同时接收消息
Azure Service Bus SubscriptionClient high latency / not receiving messages concurrently
如果我将一批消息发送到主题,并使用订阅客户端读取消息,那么我似乎会顺序接收消息,即每发送一条消息都会触发 OnMessageAsync ,但是每个接收事件之间存在明显的(150+ 毫秒)延迟
发件人:
var factory = MessagingFactory.CreateFromConnectionString("blah");
sender = factory.CreateMessageSender("MyTopicName");
var tasks = new List<Task>();
for (int i = 0; i < 10; i++)
tasks.Add(sender.SendAsync(new BrokeredMessage("My Message"))
.ContinueWith(t => Log("Sent Message {i}"));
await Task.WhenAll(tasks); // This completes within a few millis
接收器:
receiver = factory.CreateSubscriptionClient("MyTopicName", "MySubscription");
_sbClient.OnMessageAsync(async message =>
{
var msg = message.GetBody<string>();
Log("Received message xxxx
await message.CompleteAsync();
});
这意味着发送的第10条消息在发送后1.5秒多才收到。
Azure 延迟测试显示我正在使用的数据中心有大约 200 毫秒的延迟,所以我不希望消息在此之前返回(实际上第一条消息是在这之后不久收到的),但是我不会不要期待我看到的 'cumulative' 行为。
尝试使用 MaxConcurrentCalls 并在 OnMessageAsync 中添加延迟,表明它按预期工作,我可以看到一次只处理 MaxConcurrentCalls
我搞砸了 DeleteOnReceive 模式,启用“Express”,禁用“Partitioning[=41” =]',使用 AMQP 而不是 SBMP 等,但是似乎没有什么区别。
[我正在使用 Microsoft.ServiceBus,版本=3.0.0.0]
编辑:
这是日志的样子。所以如果我同时发送10条消息,我只会在发送后1.5秒收到第10条消息:
18:09:32.624 Sent message 0
18:09:32.624 Sent message 1
18:09:32.641 Sent message 2
18:09:32.641 Sent message 3
18:09:32.674 Sent message 4
18:09:32.674 Sent message 5
18:09:32.709 Sent message 6
18:09:32.709 Sent message 7
18:09:32.738 Sent message 8
18:09:32.738 Sent message 9
18:09:32.791 Received message 1 in 341 millis
18:09:32.950 Received message 2 in 487 millis
18:09:33.108 Received message 3 in 628 millis
18:09:33.265 Received message 4 in 770 millis
18:09:33.426 Received message 5 in 914 millis
18:09:33.586 Received message 6 in 1060 millis
18:09:33.745 Received message 7 in 1202 millis
18:09:33.906 Received message 8 in 1347 millis
18:09:34.065 Received message 9 in 1492 millis
基本上,您处理消息的速度 比服务总线传送新消息的速度 快得多。 Azure SB 在 individual-message 基础上相对较慢。通过在完成前添加 Task.Delay
来验证这一点并记录线程 ID,您应该会看到多个副本旋转起来。
在深入了解 OnMessage 消息泵的工作原理后,我意识到这实际上是一种 轮询 机制,其中对 ServiceBus 的底层调用仍然是一个 'Receive()' 尝试 拉取 任何新消息。如果超时,调用将无限期地再次进行。
如果对 Receive() 的调用 仅返回一条消息 ,然后需要 150 毫秒的往返时间来检索下一条消息,那么我看到的行为就有意义了
输入PrefetchCount。在 SubscriptionClient 上将其设置为非零值有效地允许底层 Receive() 提取多条消息,然后将这些消息缓存起来并(立即)可用于冒泡到 OnMessage。
如果我将一批消息发送到主题,并使用订阅客户端读取消息,那么我似乎会顺序接收消息,即每发送一条消息都会触发 OnMessageAsync ,但是每个接收事件之间存在明显的(150+ 毫秒)延迟
发件人:
var factory = MessagingFactory.CreateFromConnectionString("blah");
sender = factory.CreateMessageSender("MyTopicName");
var tasks = new List<Task>();
for (int i = 0; i < 10; i++)
tasks.Add(sender.SendAsync(new BrokeredMessage("My Message"))
.ContinueWith(t => Log("Sent Message {i}"));
await Task.WhenAll(tasks); // This completes within a few millis
接收器:
receiver = factory.CreateSubscriptionClient("MyTopicName", "MySubscription");
_sbClient.OnMessageAsync(async message =>
{
var msg = message.GetBody<string>();
Log("Received message xxxx
await message.CompleteAsync();
});
这意味着发送的第10条消息在发送后1.5秒多才收到。
Azure 延迟测试显示我正在使用的数据中心有大约 200 毫秒的延迟,所以我不希望消息在此之前返回(实际上第一条消息是在这之后不久收到的),但是我不会不要期待我看到的 'cumulative' 行为。
尝试使用 MaxConcurrentCalls 并在 OnMessageAsync 中添加延迟,表明它按预期工作,我可以看到一次只处理 MaxConcurrentCalls
我搞砸了 DeleteOnReceive 模式,启用“Express”,禁用“Partitioning[=41” =]',使用 AMQP 而不是 SBMP 等,但是似乎没有什么区别。
[我正在使用 Microsoft.ServiceBus,版本=3.0.0.0]
编辑:
这是日志的样子。所以如果我同时发送10条消息,我只会在发送后1.5秒收到第10条消息:
18:09:32.624 Sent message 0
18:09:32.624 Sent message 1
18:09:32.641 Sent message 2
18:09:32.641 Sent message 3
18:09:32.674 Sent message 4
18:09:32.674 Sent message 5
18:09:32.709 Sent message 6
18:09:32.709 Sent message 7
18:09:32.738 Sent message 8
18:09:32.738 Sent message 918:09:32.791 Received message 1 in 341 millis
18:09:32.950 Received message 2 in 487 millis
18:09:33.108 Received message 3 in 628 millis
18:09:33.265 Received message 4 in 770 millis
18:09:33.426 Received message 5 in 914 millis
18:09:33.586 Received message 6 in 1060 millis
18:09:33.745 Received message 7 in 1202 millis
18:09:33.906 Received message 8 in 1347 millis
18:09:34.065 Received message 9 in 1492 millis
基本上,您处理消息的速度 比服务总线传送新消息的速度 快得多。 Azure SB 在 individual-message 基础上相对较慢。通过在完成前添加 Task.Delay
来验证这一点并记录线程 ID,您应该会看到多个副本旋转起来。
在深入了解 OnMessage 消息泵的工作原理后,我意识到这实际上是一种 轮询 机制,其中对 ServiceBus 的底层调用仍然是一个 'Receive()' 尝试 拉取 任何新消息。如果超时,调用将无限期地再次进行。 如果对 Receive() 的调用 仅返回一条消息 ,然后需要 150 毫秒的往返时间来检索下一条消息,那么我看到的行为就有意义了
输入PrefetchCount。在 SubscriptionClient 上将其设置为非零值有效地允许底层 Receive() 提取多条消息,然后将这些消息缓存起来并(立即)可用于冒泡到 OnMessage。