Azure 服务总线主题和具有辅助角色的订阅

Azure Service Bus Topics and Subscriptions with Worker Role

所以我最近得到了使用 Service Bus Topic and Subscriptions 的需要,并且我已经阅读了许多文章和教程。我已经能够成功实施 Microsoft 的 Get started with Service Bus topics 并且还成功地使用 Visual Studio 2017's Worker Role 模板来访问数据库。

但是,我对如何正确 "combine" 这两者感到困惑。虽然 Get started with Service Bus topics 文章展示了如何创建 2 个应用程序,一个用于发送,一个用于接收然后退出,但 Worker Role 模板似乎无休止地循环 await Task.Delay(10000);

我不确定如何正确地 "mesh" 这两个。本质上,我希望我的 Worker Role 保持活动状态并永远监听其订阅的条目(或直到它明显退出)。

任何指导都会很棒!

P.S.: 如果您有兴趣,我已经在 StackExchange - Software Engineering 问了一个关于我应该在我的案例场景中使用的适当技术的相关问题。

更新 #1 (2018/08/09)

基于 ,这里是我如何根据我使用 Visual Studio 2017 的 [=21= 阅读和接收的文章发送 Message 的一些代码]模板。

正在发送(基于Get started with Service Bus topics

using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;

namespace TopicsSender {
    internal static class Program {
        private const string ServiceBusConnectionString = "<your_connection_string>";
        private const string TopicName = "test-topic";
        private static ITopicClient _topicClient;

        private static void Main(string[] args) {
            MainAsync().GetAwaiter().GetResult();
        }

        private static async Task MainAsync() {
            const int numberOfMessages = 10;
            _topicClient = new TopicClient(ServiceBusConnectionString, TopicName);

            Console.WriteLine("======================================================");
            Console.WriteLine("Press ENTER key to exit after sending all the messages.");
            Console.WriteLine("======================================================");

            // Send messages.
            await SendMessagesAsync(numberOfMessages);

            Console.ReadKey();

            await _topicClient.CloseAsync();
        }

        private static async Task SendMessagesAsync(int numberOfMessagesToSend) {
            try {
                for (var i = 0; i < numberOfMessagesToSend; i++) {
                    // Create a new message to send to the topic
                    var messageBody = $"Message {i}";
                    var message = new Message(Encoding.UTF8.GetBytes(messageBody));

                    // Write the body of the message to the console
                    Console.WriteLine($"Sending message: {messageBody}");

                    // Send the message to the topic
                    await _topicClient.SendAsync(message);
                }
            } catch (Exception exception) {
                Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
            }
        }
    }
}

接收(基于Worker Role with Service Bus Queue模板)

using System;
using System.Diagnostics;
using System.Net;
using System.Threading;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure.ServiceRuntime;

namespace WorkerRoleWithSBQueue1 {
    public class WorkerRole : RoleEntryPoint {
        // The name of your queue
        private const string ServiceBusConnectionString = "<your_connection_string>";
        private const string TopicName = "test-topic";
        private const string SubscriptionName = "test-sub1";

        // QueueClient is thread-safe. Recommended that you cache 
        // rather than recreating it on every request
        private SubscriptionClient _client;
        private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false);

        public override void Run() {
            Trace.WriteLine("Starting processing of messages");

            // Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump.
            _client.OnMessage((receivedMessage) => {
                try {
                    // Process the message
                    Trace.WriteLine("Processing Service Bus message: " + receivedMessage.SequenceNumber.ToString());
                    var message = receivedMessage.GetBody<byte[]>();
                    Trace.WriteLine($"Received message: SequenceNumber:{receivedMessage.SequenceNumber} Body:{message.ToString()}");
                } catch (Exception e) {
                    // Handle any message processing specific exceptions here
                    Trace.Write(e.ToString());
                }
            });

            _completedEvent.WaitOne();
        }

        public override bool OnStart() {
            // Set the maximum number of concurrent connections 
            ServicePointManager.DefaultConnectionLimit = 12;

            // Initialize the connection to Service Bus Queue
            _client = SubscriptionClient.CreateFromConnectionString(ServiceBusConnectionString, TopicName, SubscriptionName);
            return base.OnStart();
        }

        public override void OnStop() {
            // Close the connection to Service Bus Queue
            _client.Close();
            _completedEvent.Set();
            base.OnStop();
        }
    }
}

更新 #2 (2018/08/10)

根据 Arunprabhu 的一些建议并知道我使用的是不同的库,下面是我当前的解决方案,其中包含来自多个来源的片段。有什么我忽略的东西,加上肩膀在那里等吗?目前收到一个错误,可能是针对另一个问题或已经回答的,所以在进一步研究之前不想 post 它。

using System;
using System.Diagnostics;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.WindowsAzure.ServiceRuntime;

namespace WorkerRoleWithSBQueue1 {
    public class WorkerRole : RoleEntryPoint {
        private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
        private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false);

        // The name of your queue
        private const string ServiceBusConnectionString = "<your_connection_string>";
        private const string TopicName = "test-topic";
        private const string SubscriptionName = "test-sub1";

        // _client is thread-safe. Recommended that you cache 
        // rather than recreating it on every request
        private SubscriptionClient _client;

        public override void Run() {
            Trace.WriteLine("Starting processing of messages");

            try {
                this.RunAsync(this._cancellationTokenSource.Token).Wait();
            } catch (Exception e) {
                Trace.WriteLine("Exception");
                Trace.WriteLine(e.ToString());
            } finally {
                Trace.WriteLine("Finally...");
                this._runCompleteEvent.Set();
            }
        }

        public override bool OnStart() {
            // Set the maximum number of concurrent connections 
            ServicePointManager.DefaultConnectionLimit = 12;

            var result = base.OnStart();

            Trace.WriteLine("WorkerRole has been started");

            return result;
        }

        public override void OnStop() {
            // Close the connection to Service Bus Queue
            this._cancellationTokenSource.Cancel();
            this._runCompleteEvent.WaitOne();

            base.OnStop();
        }

        private async Task RunAsync(CancellationToken cancellationToken) {
            // Configure the client
            RegisterOnMessageHandlerAndReceiveMessages(ServiceBusConnectionString, TopicName, SubscriptionName);

            _runCompleteEvent.WaitOne();

            Trace.WriteLine("Closing");
            await _client.CloseAsync();
        }

        private void RegisterOnMessageHandlerAndReceiveMessages(string connectionString, string topicName, string subscriptionName) {
            _client = new SubscriptionClient(connectionString, topicName, subscriptionName);

            var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler) {
                // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
                // Set it according to how many messages the application wants to process in parallel.
                MaxConcurrentCalls = 1,

                // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
                // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
                AutoComplete = false,
            };

            _client.RegisterMessageHandler(ProcessMessageAsync, messageHandlerOptions);
        }

        private async Task ProcessMessageAsync(Message message, CancellationToken token) {
            try {
                // Process the message
                Trace.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
                await _client.CompleteAsync(message.SystemProperties.LockToken);
            } catch (Exception e) {
                // Handle any message processing specific exceptions here
                Trace.Write(e.ToString());
                await _client.AbandonAsync(message.SystemProperties.LockToken);
            }
        }

        private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) {
            Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
            var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
            Console.WriteLine("Exception context for troubleshooting:");
            Console.WriteLine($"- Endpoint: {context.Endpoint}");
            Console.WriteLine($"- Entity Path: {context.EntityPath}");
            Console.WriteLine($"- Executing Action: {context.Action}");
            return Task.CompletedTask;
        }
    }
}

如果您使用的是 Visual Studio,则有一个默认模板可用于使用服务总线队列创建 Azure 云服务和辅助角色。您需要在 WorkerRole.cs.

中将 QueueClient 更改为 SubscriptionClient

然后,工作者角色将保持活动状态,侦听来自主题订阅的消息。

您可以找到示例 here。您应该在 Cloud Service

中使用 Service Bus Queue 创建 Worker 角色

考虑到更新问题 Update #1 (2018/08/09) 的复杂性,我提供了一个单独的答案。

发送方和接收方使用不同的库。

发件人 - Microsoft.Azure.ServiceBus

接收者 - WindowsAzure.ServiceBus

Microsoft.Azure.ServiceBus 的消息对象为 Message,其中 WindowsAzure.ServiceBus 有 BrokeredMessage。

Microsoft.Azure.ServiceBus 中有一种方法 RegisterMessageHandler 可用,这是 WindowsAzure.ServiceBus 中 client.OnMessage() 的替代方法。通过使用它,侦听器将消息作为 Message 对象接收。这个库如你所料支持异步编程。

请参阅 here 以获取两个库中的样本。