Azure 服务总线接收和删除
Azure Service Bus ReceiveAndDelete
所以我创建了示例应用程序,其中应用程序将消息发送到队列并读取消息。对于这个应用程序,我使用“ReceiveAndDelete”,下面是示例代码
创建消息
private static async void CreateMessage(string queueName, string textMessage)
{
// create a Service Bus client
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a sender for the queue
ServiceBusSender sender = client.CreateSender(queueName);
// create a message that we can send
ServiceBusMessage message = new ServiceBusMessage(textMessage);
// send the message
await sender.SendMessageAsync(message);
Console.WriteLine($"Sent a single message to the queue: {queueName}");
}
}
接收消息
// handle received messages
static async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
Console.WriteLine($"Received: {body}");
// complete the message. messages is deleted from the queue.
await args.CompleteMessageAsync(args.Message);
}
// handle any errors when receiving messages
static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
static async Task ReceiveMessagesAsync()
{
var processorOptions = new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
PrefetchCount = 1
};
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a processor that we can use to process the messages
ServiceBusProcessor processor = client.CreateProcessor(queueName, processorOptions);
// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;
// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;
// start processing
await processor.StartProcessingAsync();
Console.WriteLine("Wait for a minute and then press any key to end the processing");
Console.ReadKey();
// stop processing
Console.WriteLine("\nStopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
}
主要方法
static string connectionString = "***";
static string queueName = "firstqueue";
static async Task Main(string[] args)
{
try
{
await CreateMessage(queueName, "Message 1 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 2 to test 'ReceiveAndDelete'");
await ReceiveMessagesAsync();
}
catch (Exception ex)
{
throw;
}
Console.ReadKey();
}
一切正常,但一旦应用程序调用“await processor.StartProcessingAsync();”即使尚未处理所有消息,也会从队列中读取所有消息。在我的示例中,队列中有两条消息,但是当“等待 processor.StartProcessingAsync();”时被称为消息计数变为零(基本上消息被出队)并且它开始一条一条地处理消息。据我了解,如果消息尚未开始处理,那么它应该在队列中。在这个例子中,只有一条消息应该从队列中删除,第二条消息应该在队列中可见。
这是预期的行为还是我遗漏了什么?
Is the expected behavior or am I missing something here?
这是 ReceiveAndDelete
模式的预期行为。服务总线会在消息发送到客户端后立即将其删除,而不管客户端是否能够处理该消息。
从这个link
:
This operation receives a message from a queue or subscription, and
removes the message from that queue or subscription in one atomic
operation.
如果你想控制这个行为,你可能想在 PeekLock
模式下获取消息,处理消息,然后调用消息上的 Complete
方法删除该消息,如果处理是成功。
更新
我试过你的代码,这是我的观察结果:
使用PrefetchCount = 1
,第一次从队列中取出2条消息并删除。之后,将获取并删除一条消息。可能的解释是预取了 1 条消息,根据请求获取了 1 条消息。
使用 PrefetchCount = 0
(或从 `processorOptions 中省略),获取并删除一条消息。
请尝试以下代码:
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
namespace SO67076189
{
class Program
{
static string connectionString = "connection-string";
static string queueName = "queue-name";
static async Task Main(string[] args)
{
try
{
await CreateMessage(queueName, "Message 1 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 2 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 3 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 4 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 5 to test 'ReceiveAndDelete'");
await ReceiveMessagesAsync();
}
catch (Exception ex)
{
throw;
}
Console.ReadKey();
}
private static async Task CreateMessage(string queueName, string textMessage)
{
// create a Service Bus client
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a sender for the queue
ServiceBusSender sender = client.CreateSender(queueName);
// create a message that we can send
ServiceBusMessage message = new ServiceBusMessage(textMessage);
// send the message
await sender.SendMessageAsync(message);
Console.WriteLine($"Sent a single message to the queue: {queueName}");
}
}
static async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
Console.WriteLine($"Received: {body}");
// complete the message. messages is deleted from the queue.
//await args.CompleteMessageAsync(args.Message);
}
// handle any errors when receiving messages
static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
static async Task ReceiveMessagesAsync()
{
var processorOptions = new ServiceBusProcessorOptions
{
//AutoCompleteMessages = false,
//MaxConcurrentCalls = 1,
//MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
//PrefetchCount = 1
};
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a processor that we can use to process the messages
ServiceBusProcessor processor = client.CreateProcessor(queueName, processorOptions);
// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;
// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;
// start processing
await processor.StartProcessingAsync();
Console.WriteLine("Wait for a minute and then press any key to end the processing");
Console.ReadKey();
// stop processing
Console.WriteLine("\nStopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
}
}
}
所以我创建了示例应用程序,其中应用程序将消息发送到队列并读取消息。对于这个应用程序,我使用“ReceiveAndDelete”,下面是示例代码
创建消息
private static async void CreateMessage(string queueName, string textMessage)
{
// create a Service Bus client
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a sender for the queue
ServiceBusSender sender = client.CreateSender(queueName);
// create a message that we can send
ServiceBusMessage message = new ServiceBusMessage(textMessage);
// send the message
await sender.SendMessageAsync(message);
Console.WriteLine($"Sent a single message to the queue: {queueName}");
}
}
接收消息
// handle received messages
static async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
Console.WriteLine($"Received: {body}");
// complete the message. messages is deleted from the queue.
await args.CompleteMessageAsync(args.Message);
}
// handle any errors when receiving messages
static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
static async Task ReceiveMessagesAsync()
{
var processorOptions = new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
PrefetchCount = 1
};
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a processor that we can use to process the messages
ServiceBusProcessor processor = client.CreateProcessor(queueName, processorOptions);
// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;
// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;
// start processing
await processor.StartProcessingAsync();
Console.WriteLine("Wait for a minute and then press any key to end the processing");
Console.ReadKey();
// stop processing
Console.WriteLine("\nStopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
}
主要方法
static string connectionString = "***";
static string queueName = "firstqueue";
static async Task Main(string[] args)
{
try
{
await CreateMessage(queueName, "Message 1 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 2 to test 'ReceiveAndDelete'");
await ReceiveMessagesAsync();
}
catch (Exception ex)
{
throw;
}
Console.ReadKey();
}
一切正常,但一旦应用程序调用“await processor.StartProcessingAsync();”即使尚未处理所有消息,也会从队列中读取所有消息。在我的示例中,队列中有两条消息,但是当“等待 processor.StartProcessingAsync();”时被称为消息计数变为零(基本上消息被出队)并且它开始一条一条地处理消息。据我了解,如果消息尚未开始处理,那么它应该在队列中。在这个例子中,只有一条消息应该从队列中删除,第二条消息应该在队列中可见。
这是预期的行为还是我遗漏了什么?
Is the expected behavior or am I missing something here?
这是 ReceiveAndDelete
模式的预期行为。服务总线会在消息发送到客户端后立即将其删除,而不管客户端是否能够处理该消息。
从这个link
:
This operation receives a message from a queue or subscription, and removes the message from that queue or subscription in one atomic operation.
如果你想控制这个行为,你可能想在 PeekLock
模式下获取消息,处理消息,然后调用消息上的 Complete
方法删除该消息,如果处理是成功。
更新
我试过你的代码,这是我的观察结果:
使用
PrefetchCount = 1
,第一次从队列中取出2条消息并删除。之后,将获取并删除一条消息。可能的解释是预取了 1 条消息,根据请求获取了 1 条消息。使用
PrefetchCount = 0
(或从 `processorOptions 中省略),获取并删除一条消息。
请尝试以下代码:
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
namespace SO67076189
{
class Program
{
static string connectionString = "connection-string";
static string queueName = "queue-name";
static async Task Main(string[] args)
{
try
{
await CreateMessage(queueName, "Message 1 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 2 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 3 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 4 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 5 to test 'ReceiveAndDelete'");
await ReceiveMessagesAsync();
}
catch (Exception ex)
{
throw;
}
Console.ReadKey();
}
private static async Task CreateMessage(string queueName, string textMessage)
{
// create a Service Bus client
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a sender for the queue
ServiceBusSender sender = client.CreateSender(queueName);
// create a message that we can send
ServiceBusMessage message = new ServiceBusMessage(textMessage);
// send the message
await sender.SendMessageAsync(message);
Console.WriteLine($"Sent a single message to the queue: {queueName}");
}
}
static async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
Console.WriteLine($"Received: {body}");
// complete the message. messages is deleted from the queue.
//await args.CompleteMessageAsync(args.Message);
}
// handle any errors when receiving messages
static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
static async Task ReceiveMessagesAsync()
{
var processorOptions = new ServiceBusProcessorOptions
{
//AutoCompleteMessages = false,
//MaxConcurrentCalls = 1,
//MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
//PrefetchCount = 1
};
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a processor that we can use to process the messages
ServiceBusProcessor processor = client.CreateProcessor(queueName, processorOptions);
// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;
// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;
// start processing
await processor.StartProcessingAsync();
Console.WriteLine("Wait for a minute and then press any key to end the processing");
Console.ReadKey();
// stop processing
Console.WriteLine("\nStopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
}
}
}