从 BackgroundService 启动服务总线客户端
Start Service Bus Client from BackgroundService
我有一个 ServiceBusClient
class 创建一个 QueueClient
用来监听总线上的消息。我查看了以下文章来进行设置:
我的 ServiceBusClient
class 处理 QueueClient
看起来像这样:
public class ServiceBusClient : IServiceBusClient
{
public ServiceBusClient(IEventService eventService, ServiceBusClientOptions options)
{
...
queueClient = new QueueClient(options.ConnectionString, options.QueueName);
}
public void Run()
{
RegisterOnMessageHandler();
}
private void RegisterOnMessageHandler()
{
...
queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
}
private async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
var eventMessage = EventMessage.FromMessage(message);
await eventService.Write(eventMessage);
if (!token.IsCancellationRequested)
{
await queueClient.CompleteAsync(message.SystemProperties.LockToken);
}
}
private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
// log errors
...
return Task.CompletedTask;
}
}
我希望从 IHostedService
甚至扩展 BackgroundService
开始。在我发现的示例中,工作不断在 while 循环中执行,这不适合我的场景,因为我只尝试 运行 单个命令。
所以我创建了一个超级简单的实现,如下所示:
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
serviceBusClient.Run();
while (!cancellationToken.IsCancellationRequested)
{
// empty loop to keep running for lifetime of pod
}
}
如果删除 async
我显然需要 return 一些东西。我试过 Task.CompletedTask
但这需要我将 return 类型更改为 Task<Task>
.
如果我有 async
,我将需要 await
一些东西,但我不确定是什么。
这感觉不对。我假设我需要更改 ServiceBusClient
中的某些内容,但我不确定是什么,因为 ProcessMessagesAsync
是异步的,并且根据我的理解在后台进行繁重的工作。
我只想让我的网络应用程序开始侦听消息,直到它死掉。我该怎么做?
我放弃了使用 BackgroundService
并实现了 IHostedService
。
public class MessageListenerService : IHostedService
{
private readonly IServiceBusClient client;
private readonly ITelemetryClient applicationInsights;
public MessageListenerService(IServiceProvider serviceProvider)
{
client = serviceProvider.GetService<IServiceBusClient>();
applicationInsights = serviceProvider.GetService<ITelemetryClient>();
}
public Task StartAsync(CancellationToken cancellationToken)
{
applicationInsights.TrackTrace(new TraceTelemetry("MessageListenerService is starting"));
client.Run();
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
applicationInsights.TrackTrace(new TraceTelemetry("MessageListenerService is stopping"));
return client.Stop();
}
}
如果您发现此代码有问题,请在评论中告诉我,我会酌情更新。
最后我们还是为它创建了一个控制台应用程序。
我有一个 ServiceBusClient
class 创建一个 QueueClient
用来监听总线上的消息。我查看了以下文章来进行设置:
我的 ServiceBusClient
class 处理 QueueClient
看起来像这样:
public class ServiceBusClient : IServiceBusClient
{
public ServiceBusClient(IEventService eventService, ServiceBusClientOptions options)
{
...
queueClient = new QueueClient(options.ConnectionString, options.QueueName);
}
public void Run()
{
RegisterOnMessageHandler();
}
private void RegisterOnMessageHandler()
{
...
queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
}
private async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
var eventMessage = EventMessage.FromMessage(message);
await eventService.Write(eventMessage);
if (!token.IsCancellationRequested)
{
await queueClient.CompleteAsync(message.SystemProperties.LockToken);
}
}
private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
// log errors
...
return Task.CompletedTask;
}
}
我希望从 IHostedService
甚至扩展 BackgroundService
开始。在我发现的示例中,工作不断在 while 循环中执行,这不适合我的场景,因为我只尝试 运行 单个命令。
所以我创建了一个超级简单的实现,如下所示:
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
serviceBusClient.Run();
while (!cancellationToken.IsCancellationRequested)
{
// empty loop to keep running for lifetime of pod
}
}
如果删除 async
我显然需要 return 一些东西。我试过 Task.CompletedTask
但这需要我将 return 类型更改为 Task<Task>
.
如果我有 async
,我将需要 await
一些东西,但我不确定是什么。
这感觉不对。我假设我需要更改 ServiceBusClient
中的某些内容,但我不确定是什么,因为 ProcessMessagesAsync
是异步的,并且根据我的理解在后台进行繁重的工作。
我只想让我的网络应用程序开始侦听消息,直到它死掉。我该怎么做?
我放弃了使用 BackgroundService
并实现了 IHostedService
。
public class MessageListenerService : IHostedService
{
private readonly IServiceBusClient client;
private readonly ITelemetryClient applicationInsights;
public MessageListenerService(IServiceProvider serviceProvider)
{
client = serviceProvider.GetService<IServiceBusClient>();
applicationInsights = serviceProvider.GetService<ITelemetryClient>();
}
public Task StartAsync(CancellationToken cancellationToken)
{
applicationInsights.TrackTrace(new TraceTelemetry("MessageListenerService is starting"));
client.Run();
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
applicationInsights.TrackTrace(new TraceTelemetry("MessageListenerService is stopping"));
return client.Stop();
}
}
如果您发现此代码有问题,请在评论中告诉我,我会酌情更新。
最后我们还是为它创建了一个控制台应用程序。