Azure 服务消息 - 一些消息被删除但未收到
Azure service message - Some messages are getting deleted but not received
我一直在阅读消息。但是即使队列变空,此代码也不会读取所有消息,但是不会读取消息。
在 while 循环中读取消息。
var serviceBusHelper = new ServiceBusMessageHelper();
while(true)
{
try
{
var timeToPollMessage = System.Diagnostics.Stopwatch.StartNew();
messages = await serviceBusHelper.ReadMessagesAsync(queueName, 10);
var elapsedTimeToPollMessageMs = timeToPollMessage.ElapsedMilliseconds;
AppLogger.LogError("**** Elapsed to Poll message batch in ms " + elapsedTimeToPollMessageMs + " ****");
}
}
class ServiceBusMessageHelper
{
static string connectionString = "Endpoint=sb://xxxxxxxxxxx";
static ServiceBusClient serviceBusuClient = new ServiceBusClient(connectionString);
public async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReadMessagesAsync(string queueName, int batchSize)
{
var receiverOptions = new ServiceBusReceiverOptions()
{
ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
};
var receiver = serviceBusuClient.CreateReceiver(queueName, receiverOptions);
//await receiver.CompleteMessageAsync
return await receiver.ReceiveMessagesAsync(batchSize);
}
}
解决方案 - 我已经按照 sean 的建议重新安排了代码。
var serviceBusHelper = new ServiceBusMessageHelper();
string connectionString = "Endpoint=xxxxxxxxxxxxxxxxx";
ServiceBusClient serviceBusClient = new ServiceBusClient(connectionString);
var receiverOptions = new ServiceBusReceiverOptions()
{
ReceiveMode = ServiceBusReceiveMode.PeekLock,
};
var receiver = serviceBusClient.CreateReceiver(queueName, receiverOptions);
int batchSize = 10;
while (true)
{
var timeToCollect100Books = System.Diagnostics.Stopwatch.StartNew();
IReadOnlyList<ServiceBusReceivedMessage> messages = null;
try
{
BackToPollMessage:
var timeToPollMessage = System.Diagnostics.Stopwatch.StartNew();
messages = await receiver.ReceiveMessagesAsync(batchSize);
AppLogger.LogError("Received Messages : " + messages.Count());
var elapsedTimeToPollMessageMs = timeToPollMessage.ElapsedMilliseconds;
AppLogger.LogError("**** Elapsed to Poll message batch in ms " + elapsedTimeToPollMessageMs + " ****");
foreach (var repricedMessage in messages)
{
await receiver.CompleteMessageAsync(repricedMessage);
}
}
catch(Exception ex)
{
}
}
它正在解决读取所有消息的问题,但现在出现了一个新问题,即传出消息现在比传入消息多得多。即使我只有这个消费者的一个实例 运行
队列配置
您 运行 处于危险的 ReceiveAndDelete
模式。这意味着从代理请求消息并传递给客户端的那一刻,它将立即从代理中删除,无论客户端是否成功处理。除非这是您的意图,否则我建议您切换到 PeekLock
模式并在处理完传入消息后完成这些消息。
此外,此代码不断地为循环的每次迭代创建消息接收器。强烈建议查看您要实现的目标及其实现方式。
我一直在阅读消息。但是即使队列变空,此代码也不会读取所有消息,但是不会读取消息。
在 while 循环中读取消息。
var serviceBusHelper = new ServiceBusMessageHelper();
while(true)
{
try
{
var timeToPollMessage = System.Diagnostics.Stopwatch.StartNew();
messages = await serviceBusHelper.ReadMessagesAsync(queueName, 10);
var elapsedTimeToPollMessageMs = timeToPollMessage.ElapsedMilliseconds;
AppLogger.LogError("**** Elapsed to Poll message batch in ms " + elapsedTimeToPollMessageMs + " ****");
}
}
class ServiceBusMessageHelper
{
static string connectionString = "Endpoint=sb://xxxxxxxxxxx";
static ServiceBusClient serviceBusuClient = new ServiceBusClient(connectionString);
public async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReadMessagesAsync(string queueName, int batchSize)
{
var receiverOptions = new ServiceBusReceiverOptions()
{
ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
};
var receiver = serviceBusuClient.CreateReceiver(queueName, receiverOptions);
//await receiver.CompleteMessageAsync
return await receiver.ReceiveMessagesAsync(batchSize);
}
}
解决方案 - 我已经按照 sean 的建议重新安排了代码。
var serviceBusHelper = new ServiceBusMessageHelper();
string connectionString = "Endpoint=xxxxxxxxxxxxxxxxx";
ServiceBusClient serviceBusClient = new ServiceBusClient(connectionString);
var receiverOptions = new ServiceBusReceiverOptions()
{
ReceiveMode = ServiceBusReceiveMode.PeekLock,
};
var receiver = serviceBusClient.CreateReceiver(queueName, receiverOptions);
int batchSize = 10;
while (true)
{
var timeToCollect100Books = System.Diagnostics.Stopwatch.StartNew();
IReadOnlyList<ServiceBusReceivedMessage> messages = null;
try
{
BackToPollMessage:
var timeToPollMessage = System.Diagnostics.Stopwatch.StartNew();
messages = await receiver.ReceiveMessagesAsync(batchSize);
AppLogger.LogError("Received Messages : " + messages.Count());
var elapsedTimeToPollMessageMs = timeToPollMessage.ElapsedMilliseconds;
AppLogger.LogError("**** Elapsed to Poll message batch in ms " + elapsedTimeToPollMessageMs + " ****");
foreach (var repricedMessage in messages)
{
await receiver.CompleteMessageAsync(repricedMessage);
}
}
catch(Exception ex)
{
}
}
它正在解决读取所有消息的问题,但现在出现了一个新问题,即传出消息现在比传入消息多得多。即使我只有这个消费者的一个实例 运行
队列配置
您 运行 处于危险的 ReceiveAndDelete
模式。这意味着从代理请求消息并传递给客户端的那一刻,它将立即从代理中删除,无论客户端是否成功处理。除非这是您的意图,否则我建议您切换到 PeekLock
模式并在处理完传入消息后完成这些消息。
此外,此代码不断地为循环的每次迭代创建消息接收器。强烈建议查看您要实现的目标及其实现方式。