Azure 服务总线队列:如何从队列中读取单个消息

Azure Service Bus Queues: How To Read Individual Messages From A Queue

我有一个应用程序,我可以在我的流程的一部分中以 JSON 格式将消息写入 Azure 服务总线队列。我有一个下游进程,我想将该消息从队列中弹出,将 json 转换为一个对象,然后处理该对象。

我可以毫无问题地将消息推送到队列中,但我无法找到任何一次从队列中弹出消息或循环弹出消息的示例。我在 Microsoft 或 Github 上看到的每个示例都是一个控制台应用程序(在 Web 应用程序中无用),它设置某种侦听器来获取队列中的所有消息并写入控制台消息。我没有找到弹出消息然后对数据进行某些处理的示例。有没有人有关于如何从队列中弹出消息然后处理它或调用另一种方法对消息中的数据执行某些操作的示例?

更新: 我使用了下面由 Guru Pasupathy 提供的 WindowsAzure.ServiceBus 示例,并使用来自 的以下代码片段结束,以从 BrokeredMessage 对象获取消息文本:

Stream stream = message.GetBody<Stream>();
StreamReader reader = new StreamReader(stream);
string messageBody =  reader.ReadToEnd();

然后我可以获取 messageBody 并将嵌入的 JSON 反序列化为一个 POCO 对象,我正在路上!现在我可以在我的应用程序中更有效地使用队列来完成各种任务。

您可以使用 Peek Lock 接收模式从队列中获取消息,处理它,然后您可以根据您的业务逻辑选择放弃或完成它。

如果您正在使用 WindowsAzure.ServiceBus nuget 包发送/接收消息,可以使用以下方法根据无需使用侦听器即可循环或单次调用。

        public void Receive()
        {
            QueueClient myQueueClient = QueueClient.CreateFromConnectionString("<connectionString>;<queueName>", ReceiveMode.PeekLock);

            int someCount = 2; //some random value for testing
            try
            {
                for (int i = 0; i < someCount; i++)
                {
                    BrokeredMessage message = myQueueClient.Receive();
                    Console.WriteLine("The message is " + message);
                    message.Complete();
                }
            }
            catch(Exception e)
            {
                //Handle your expection
            }

        }

如果您使用的是 Microsoft.Azure.Service nuget 包,那么我找不到不使用侦听器只读取单个消息的直接方法。我看到侦听器将继续轮询和处理队列,直到没有更多消息为止。

如果您的要求是停止轮询并继续获取所有可用消息,那么作为一种解决方法,您可以在读取一条消息后关闭 QueueClient 实例,并在您的进程准备好接收消息时打开 qc 和注册处理程序下一个。

        public async Task ProcessMessagesAsync(Message message, CancellationToken token)
        {
            Console.WriteLine($"Received message: {Encoding.UTF8.GetString(message.Body)}");

            BinaryFormatter bf = new BinaryFormatter();
            using (MemoryStream ms = new MemoryStream(message.Body))
            {
                Payload payload = (Payload) bf.Deserialize(ms);

                //Based on your needs you may have a condition here based on which you could Abandon or Complete the mesage

                await qc.CompleteAsync(message.SystemProperties.LockToken);
                Console.WriteLine("Completed the message --> " + payload.Message + " -- Id --> " + payload.Id);

                //await qc.AbandonAsync(message.SystemProperties.LockToken);
                //Console.WriteLine("Abandon the message --> " + payload.Message + " -- Id --> " + payload.Id);

                qc.CloseAsync(); //If you close the QueueClient instance here, no more messages will be picked up from queue.
            }
        }

上面的例子是基于 https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues#receive-messages-from-the-queue ,我只是在最后添加了 qc.CloseAsync() 调用。如果没有这一行,侦听器将继续处理,直到队列中没有更多消息为止。我不确定是否有更好的方法来实现这一点,但想到分享。

希望对您有所帮助

编辑 -

发送消息时,如果您使用的是自定义类型,则可以使用以下

                BrokeredMessage message = new BrokeredMessage(new Payload() { Id = 4332, Message = "WindowsAzure package" });
                myQueueClient.Send(message);

并且在接收时您应该使用如下所示的 GetBody

                BrokeredMessage message = myQueueClient.Receive();
                var incoming = message.GetBody<Payload>();
                Console.WriteLine("The message is " + incoming.Id + " and " + incoming.Message);
                message.Complete();

以下是自定义对象供您参考

    [Serializable]
    public class Payload
    {
        public int Id { get; set; }
        public string Message { get; set; }
    }

同样适用于字符串

接收时可以使用

var incoming = message.GetBody<string>();

发送时您可以发送为

BrokeredMessage message = new BrokeredMessage("WindowsAzure package" );

您可以在下方获得有关不同内容格式的更多详细信息 link https://abhishekrlal.com/2012/03/30/formatting-the-content-for-service-bus-messages/