Azure 服务总线接收和删除

Azure Service Bus ReceiveAndDelete

所以我创建了示例应用程序,其中应用程序将消息发送到队列并读取消息。对于这个应用程序,我使用“ReceiveAndDelete”,下面是示例代码

创建消息

private static async void CreateMessage(string queueName, string textMessage)
{
    // create a Service Bus client 
    await using (ServiceBusClient client = new ServiceBusClient(connectionString))
    {
        // create a sender for the queue 
        ServiceBusSender sender = client.CreateSender(queueName);

        // create a message that we can send
        ServiceBusMessage message = new ServiceBusMessage(textMessage);

        // send the message
        await sender.SendMessageAsync(message);
        Console.WriteLine($"Sent a single message to the queue: {queueName}");
    }
}

接收消息

// handle received messages
static async Task MessageHandler(ProcessMessageEventArgs args)
{
    string body = args.Message.Body.ToString();
    Console.WriteLine($"Received: {body}");

    // complete the message. messages is deleted from the queue. 
    await args.CompleteMessageAsync(args.Message);
}

// handle any errors when receiving messages
static Task ErrorHandler(ProcessErrorEventArgs args)
{
    Console.WriteLine(args.Exception.ToString());
    return Task.CompletedTask;
}

static async Task ReceiveMessagesAsync()
{
    var processorOptions = new ServiceBusProcessorOptions
    {
        AutoCompleteMessages = false,
        MaxConcurrentCalls = 1,
        MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
        ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
        PrefetchCount = 1
    };


    await using (ServiceBusClient client = new ServiceBusClient(connectionString))
    {

        // create a processor that we can use to process the messages
        ServiceBusProcessor processor = client.CreateProcessor(queueName, processorOptions);

        // add handler to process messages
        processor.ProcessMessageAsync += MessageHandler;

        // add handler to process any errors
        processor.ProcessErrorAsync += ErrorHandler;
        
        // start processing 
        await processor.StartProcessingAsync();

        Console.WriteLine("Wait for a minute and then press any key to end the processing");
        Console.ReadKey();

        // stop processing 
        Console.WriteLine("\nStopping the receiver...");
        await processor.StopProcessingAsync();
        Console.WriteLine("Stopped receiving messages");
    }
}

主要方法

static string connectionString = "***";
static string queueName = "firstqueue";
static async Task Main(string[] args)
{
    try
    {
        await CreateMessage(queueName, "Message 1 to test 'ReceiveAndDelete'");
        await CreateMessage(queueName, "Message 2 to test 'ReceiveAndDelete'");

        await ReceiveMessagesAsync();
    }
    catch (Exception ex)
    {

        throw;
    }
    Console.ReadKey();
}

一切正常,但一旦应用程序调用“await processor.StartProcessingAsync();”即使尚未处理所有消息,也会从队列中读取所有消息。在我的示例中,队列中有两条消息,但是当“等待 processor.StartProcessingAsync();”时被称为消息计数变为零(基本上消息被出队)并且它开始一条一条地处理消息。据我了解,如果消息尚未开始处理,那么它应该在队列中。在这个例子中,只有一条消息应该从队列中删除,第二条消息应该在队列中可见。

这是预期的行为还是我遗漏了什么?

Is the expected behavior or am I missing something here?

这是 ReceiveAndDelete 模式的预期行为。服务总线会在消息发送到客户端后立即将其删除,而不管客户端是否能够处理该消息。

从这个link:

This operation receives a message from a queue or subscription, and removes the message from that queue or subscription in one atomic operation.

如果你想控制这个行为,你可能想在 PeekLock 模式下获取消息,处理消息,然后调用消息上的 Complete 方法删除该消息,如果处理是成功。

更新

我试过你的代码,这是我的观察结果:

  1. 使用PrefetchCount = 1,第一次从队列中取出2条消息并删除。之后,将获取并删除一条消息。可能的解释是预取了 1 条消息,根据请求获取了 1 条消息。

  2. 使用 PrefetchCount = 0(或从 `processorOptions 中省略),获取并删除一条消息。

请尝试以下代码:

using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;

namespace SO67076189
{
    class Program
    {
        static string connectionString = "connection-string";
        static string queueName = "queue-name";
        static async Task Main(string[] args)
        {
            try
            {
                await CreateMessage(queueName, "Message 1 to test 'ReceiveAndDelete'");
                await CreateMessage(queueName, "Message 2 to test 'ReceiveAndDelete'");
                await CreateMessage(queueName, "Message 3 to test 'ReceiveAndDelete'");
                await CreateMessage(queueName, "Message 4 to test 'ReceiveAndDelete'");
                await CreateMessage(queueName, "Message 5 to test 'ReceiveAndDelete'");

                await ReceiveMessagesAsync();
            }
            catch (Exception ex)
            {

                throw;
            }
            Console.ReadKey();
        }

        private static async Task CreateMessage(string queueName, string textMessage)
        {
            // create a Service Bus client 
            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {
                // create a sender for the queue 
                ServiceBusSender sender = client.CreateSender(queueName);

                // create a message that we can send
                ServiceBusMessage message = new ServiceBusMessage(textMessage);

                // send the message
                await sender.SendMessageAsync(message);
                Console.WriteLine($"Sent a single message to the queue: {queueName}");
            }
        }

        static async Task MessageHandler(ProcessMessageEventArgs args)
        {
            string body = args.Message.Body.ToString();
            Console.WriteLine($"Received: {body}");

            // complete the message. messages is deleted from the queue. 
            //await args.CompleteMessageAsync(args.Message);
        }

        // handle any errors when receiving messages
        static Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        }

        static async Task ReceiveMessagesAsync()
        {
            var processorOptions = new ServiceBusProcessorOptions
            {
                //AutoCompleteMessages = false,
                //MaxConcurrentCalls = 1,
                //MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
                ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
                //PrefetchCount = 1
            };


            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {

                // create a processor that we can use to process the messages
                ServiceBusProcessor processor = client.CreateProcessor(queueName, processorOptions);

                // add handler to process messages
                processor.ProcessMessageAsync += MessageHandler;

                // add handler to process any errors
                processor.ProcessErrorAsync += ErrorHandler;

                // start processing 
                await processor.StartProcessingAsync();

                Console.WriteLine("Wait for a minute and then press any key to end the processing");
                Console.ReadKey();

                // stop processing 
                Console.WriteLine("\nStopping the receiver...");
                await processor.StopProcessingAsync();
                Console.WriteLine("Stopped receiving messages");
            }
        }
    }
}