如何在 Azure 服务总线 .Net Core 中获取队列的所有会话 ID

How to get all the session id of a queue in azure service bus .Net Core

我正在制作一个使用 .Net Core 2.1.1 连接到 Azure 的 Web 应用程序。我在尝试获取相关队列的 sessionid 时遇到了一些与服务总线队列相关的问题。

我找到了一些代码,但 .Net Core 不支持它。下面是代码:

var queueClient = QueueClient.CreateFromConnectionString(AppSettings.ServiceBusConnection, queueName);
var sessions = await queueClient.GetMessageSessionsAsync();
return sessions;

这个功能我也试过了,

var connString = Configuration.GetConnectionString("servicebus");
sessionClient = new SessionClient(connString, queue,ReceiveMode.PeekLock);
List<IMessageSession> sessions=new List<IMessageSession>();
while (true)
{
    var session = await sessionClient.AcceptMessageSessionAsync();             
    if (session == null)
        break;
    sessions.Add(session);
}
return sessions;
}

但它一直给我 Timeout Exception。谁能帮帮我?

这是我尝试过的并且对我有用,请查看下面的屏幕截图

这是我试过的代码

using System;

namespace Core.SBConsole
{
    using Microsoft.Azure.ServiceBus;
    using Microsoft.Azure.ServiceBus.Core;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;

    class Program
    {
        // Connection String for the namespace can be obtained from the Azure portal under the 
        // 'Shared Access policies' section.
        const string ServiceBusConnectionString = "{Connection String}";
        const string QueueName = "mvq";
        static IMessageSender messageSender;
        static ISessionClient sessionClient;
        const string SessionPrefix = "session-prefix";

        static void Main(string[] args)
        {
            MainAsync().GetAwaiter().GetResult();
        }

        static async Task MainAsync()
        {
            const int numberOfSessions = 5;
            const int numberOfMessagesPerSession = 3;

            messageSender = new MessageSender(ServiceBusConnectionString, QueueName);
            sessionClient = new SessionClient(ServiceBusConnectionString, QueueName);

            // Send messages with sessionId set
            await SendSessionMessagesAsync(numberOfSessions, numberOfMessagesPerSession);

            // Receive all Session based messages using SessionClient
            await ReceiveSessionMessagesAsync(numberOfSessions, numberOfMessagesPerSession);

            Console.WriteLine("=========================================================");
            Console.WriteLine("Completed Receiving all messages... Press any key to exit");
            Console.WriteLine("=========================================================");

            Console.ReadKey();

            await messageSender.CloseAsync();
            await sessionClient.CloseAsync();
        }

        static async Task ReceiveSessionMessagesAsync(int numberOfSessions, int messagesPerSession)
        {
            Console.WriteLine("===================================================================");
            Console.WriteLine("Accepting sessions in the reverse order of sends for demo purposes");
            Console.WriteLine("===================================================================");

            for (int i = 0; i < numberOfSessions; i++)
            {
                int messagesReceivedPerSession = 0;

                // AcceptMessageSessionAsync(i.ToString()) as below with session id as parameter will try to get a session with that sessionId.
                // AcceptMessageSessionAsync() without any messages will try to get any available session with messages associated with that session.
                IMessageSession session = await sessionClient.AcceptMessageSessionAsync();// (SessionPrefix + i.ToString());

                if (session != null)
                {
                    // Messages within a session will always arrive in order.
                    Console.WriteLine("=====================================");
                    Console.WriteLine($"Received Session: {session.SessionId}");

                    while (messagesReceivedPerSession++ < messagesPerSession)
                    {
                        Message message = await session.ReceiveAsync();

                        Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");

                        // Complete the message so that it is not received again.
                        // This can be done only if the queueClient is created in ReceiveMode.PeekLock mode (which is default).
                        await session.CompleteAsync(message.SystemProperties.LockToken);
                    }

                    Console.WriteLine($"Received all messages for Session: {session.SessionId}");
                    Console.WriteLine("=====================================");

                    // Close the Session after receiving all messages from the session
                    await session.CloseAsync();
                }
            }
        }

        static async Task SendSessionMessagesAsync(int numberOfSessions, int messagesPerSession)
        {
            if (numberOfSessions == 0 || messagesPerSession == 0)
            {
                await Task.FromResult(false);
            }

            for (int i = numberOfSessions - 1; i >= 0; i--)
            {
                var messagesToSend = new List<Message>();
                string sessionId = SessionPrefix + i;
                for (int j = 0; j < messagesPerSession; j++)
                {
                    // Create a new message to send to the queue
                    string messageBody = "test" + j;
                    var message = new Message(Encoding.UTF8.GetBytes(messageBody));
                    // Assign a SessionId for the message
                    message.SessionId = sessionId;
                    messagesToSend.Add(message);

                    // Write the sessionId, body of the message to the console
                    Console.WriteLine($"Sending SessionId: {message.SessionId}, message: {messageBody}");
                }

                // Send a batch of messages corresponding to this sessionId to the queue
                await messageSender.SendAsync(messagesToSend);
            }

            Console.WriteLine("=====================================");
            Console.WriteLine($"Sent {messagesPerSession} messages each for {numberOfSessions} sessions.");
            Console.WriteLine("=====================================");
        }
    }
}

创建队列前需要考虑的事项

1) 确保服务总线不在免费或基本层中,如果是,则将其扩展到 Standadrd

2) 确保在创建队列时启用会话。

我正在使用 Microsoft.Azure.ServiceBus 最新的 nuget pakcage 3.4 now.If 你正在使用其他一些包尝试 upgrade/downgrade 它。

希望对您有所帮助。