Azure 服务总线主题和具有辅助角色的订阅
Azure Service Bus Topics and Subscriptions with Worker Role
所以我最近得到了使用 Service Bus Topic and Subscriptions
的需要,并且我已经阅读了许多文章和教程。我已经能够成功实施 Microsoft 的 Get started with Service Bus topics 并且还成功地使用 Visual Studio 2017's
Worker Role
模板来访问数据库。
但是,我对如何正确 "combine" 这两者感到困惑。虽然 Get started with Service Bus topics 文章展示了如何创建 2 个应用程序,一个用于发送,一个用于接收然后退出,但 Worker Role
模板似乎无休止地循环 await Task.Delay(10000);
。
我不确定如何正确地 "mesh" 这两个。本质上,我希望我的 Worker Role
保持活动状态并永远监听其订阅的条目(或直到它明显退出)。
任何指导都会很棒!
P.S.: 如果您有兴趣,我已经在 StackExchange - Software Engineering 问了一个关于我应该在我的案例场景中使用的适当技术的相关问题。
更新 #1 (2018/08/09)
基于 Visual Studio 2017
的 [=21= 阅读和接收的文章发送 Message
的一些代码]模板。
正在发送(基于Get started with Service Bus topics)
using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
namespace TopicsSender {
internal static class Program {
private const string ServiceBusConnectionString = "<your_connection_string>";
private const string TopicName = "test-topic";
private static ITopicClient _topicClient;
private static void Main(string[] args) {
MainAsync().GetAwaiter().GetResult();
}
private static async Task MainAsync() {
const int numberOfMessages = 10;
_topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
Console.WriteLine("======================================================");
Console.WriteLine("Press ENTER key to exit after sending all the messages.");
Console.WriteLine("======================================================");
// Send messages.
await SendMessagesAsync(numberOfMessages);
Console.ReadKey();
await _topicClient.CloseAsync();
}
private static async Task SendMessagesAsync(int numberOfMessagesToSend) {
try {
for (var i = 0; i < numberOfMessagesToSend; i++) {
// Create a new message to send to the topic
var messageBody = $"Message {i}";
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
// Write the body of the message to the console
Console.WriteLine($"Sending message: {messageBody}");
// Send the message to the topic
await _topicClient.SendAsync(message);
}
} catch (Exception exception) {
Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
}
}
}
}
接收(基于Worker Role with Service Bus Queue
模板)
using System;
using System.Diagnostics;
using System.Net;
using System.Threading;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure.ServiceRuntime;
namespace WorkerRoleWithSBQueue1 {
public class WorkerRole : RoleEntryPoint {
// The name of your queue
private const string ServiceBusConnectionString = "<your_connection_string>";
private const string TopicName = "test-topic";
private const string SubscriptionName = "test-sub1";
// QueueClient is thread-safe. Recommended that you cache
// rather than recreating it on every request
private SubscriptionClient _client;
private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false);
public override void Run() {
Trace.WriteLine("Starting processing of messages");
// Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump.
_client.OnMessage((receivedMessage) => {
try {
// Process the message
Trace.WriteLine("Processing Service Bus message: " + receivedMessage.SequenceNumber.ToString());
var message = receivedMessage.GetBody<byte[]>();
Trace.WriteLine($"Received message: SequenceNumber:{receivedMessage.SequenceNumber} Body:{message.ToString()}");
} catch (Exception e) {
// Handle any message processing specific exceptions here
Trace.Write(e.ToString());
}
});
_completedEvent.WaitOne();
}
public override bool OnStart() {
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// Initialize the connection to Service Bus Queue
_client = SubscriptionClient.CreateFromConnectionString(ServiceBusConnectionString, TopicName, SubscriptionName);
return base.OnStart();
}
public override void OnStop() {
// Close the connection to Service Bus Queue
_client.Close();
_completedEvent.Set();
base.OnStop();
}
}
}
更新 #2 (2018/08/10)
根据 Arunprabhu 的一些建议并知道我使用的是不同的库,下面是我当前的解决方案,其中包含来自多个来源的片段。有什么我忽略的东西,加上肩膀在那里等吗?目前收到一个错误,可能是针对另一个问题或已经回答的,所以在进一步研究之前不想 post 它。
using System;
using System.Diagnostics;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.WindowsAzure.ServiceRuntime;
namespace WorkerRoleWithSBQueue1 {
public class WorkerRole : RoleEntryPoint {
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false);
// The name of your queue
private const string ServiceBusConnectionString = "<your_connection_string>";
private const string TopicName = "test-topic";
private const string SubscriptionName = "test-sub1";
// _client is thread-safe. Recommended that you cache
// rather than recreating it on every request
private SubscriptionClient _client;
public override void Run() {
Trace.WriteLine("Starting processing of messages");
try {
this.RunAsync(this._cancellationTokenSource.Token).Wait();
} catch (Exception e) {
Trace.WriteLine("Exception");
Trace.WriteLine(e.ToString());
} finally {
Trace.WriteLine("Finally...");
this._runCompleteEvent.Set();
}
}
public override bool OnStart() {
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
var result = base.OnStart();
Trace.WriteLine("WorkerRole has been started");
return result;
}
public override void OnStop() {
// Close the connection to Service Bus Queue
this._cancellationTokenSource.Cancel();
this._runCompleteEvent.WaitOne();
base.OnStop();
}
private async Task RunAsync(CancellationToken cancellationToken) {
// Configure the client
RegisterOnMessageHandlerAndReceiveMessages(ServiceBusConnectionString, TopicName, SubscriptionName);
_runCompleteEvent.WaitOne();
Trace.WriteLine("Closing");
await _client.CloseAsync();
}
private void RegisterOnMessageHandlerAndReceiveMessages(string connectionString, string topicName, string subscriptionName) {
_client = new SubscriptionClient(connectionString, topicName, subscriptionName);
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler) {
// Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
// Set it according to how many messages the application wants to process in parallel.
MaxConcurrentCalls = 1,
// Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
// False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
AutoComplete = false,
};
_client.RegisterMessageHandler(ProcessMessageAsync, messageHandlerOptions);
}
private async Task ProcessMessageAsync(Message message, CancellationToken token) {
try {
// Process the message
Trace.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
await _client.CompleteAsync(message.SystemProperties.LockToken);
} catch (Exception e) {
// Handle any message processing specific exceptions here
Trace.Write(e.ToString());
await _client.AbandonAsync(message.SystemProperties.LockToken);
}
}
private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) {
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine("Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
}
}
如果您使用的是 Visual Studio,则有一个默认模板可用于使用服务总线队列创建 Azure 云服务和辅助角色。您需要在 WorkerRole.cs.
中将 QueueClient 更改为 SubscriptionClient然后,工作者角色将保持活动状态,侦听来自主题订阅的消息。
您可以找到示例 here。您应该在 Cloud Service
中使用 Service Bus Queue 创建 Worker 角色考虑到更新问题 Update #1 (2018/08/09) 的复杂性,我提供了一个单独的答案。
发送方和接收方使用不同的库。
发件人 - Microsoft.Azure.ServiceBus
接收者 - WindowsAzure.ServiceBus
Microsoft.Azure.ServiceBus 的消息对象为 Message,其中 WindowsAzure.ServiceBus 有 BrokeredMessage。
Microsoft.Azure.ServiceBus 中有一种方法 RegisterMessageHandler 可用,这是 WindowsAzure.ServiceBus 中 client.OnMessage() 的替代方法。通过使用它,侦听器将消息作为 Message 对象接收。这个库如你所料支持异步编程。
请参阅 here 以获取两个库中的样本。