如何为 ASP.NET 核心服务提供商编写 EasyNetQ 自动订阅者调度程序?
How to write an EasyNetQ auto subscriber dispatcher for the ASP.NET Core services provider?
因此,EasyNetQ 自动订阅者有一个基本的默认调度程序,无法使用非无参数构造函数创建消息消费者类。
要查看实际效果,请创建一个具有所需依赖项的使用者。您可以设置自己的服务,也可以使用 ILogger<T>
,它由框架默认值自动注册。
ConsumeTextMessage.cs
public class ConsumeTextMessage : IConsume<TextMessage>
{
private readonly ILogger<ConsumeTextMessage> logger;
public ConsumeTextMessage(ILogger<ConsumeTextMessage> logger)
{
this.logger = logger;
}
public void Consume(TextMessage message)
{
...
}
}
连接自动订户(这里有一些余地,就 where/when 到 write/run 此代码)。
Startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IBus>(RabbitHutch.CreateBus("host=localhost"));
}
(在其他地方,可能是 startup.Configure
或 BackgroundService
)
var subscriber = new AutoSubscriber(bus, "example");
subscriber.Subscribe(Assembly.GetExecutingAssembly());
现在,启动程序并发布一些消息,您应该会看到每条消息都在默认错误队列中结束。
System.MissingMethodException: No parameterless constructor defined for this object.
at System.RuntimeTypeHandle.CreateInstance(RuntimeType type, Boolean publicOnly, Boolean wrapExceptions, Boolean& canBeCached, RuntimeMethodHandleInternal& ctor)
at System.RuntimeType.CreateInstanceSlow(Boolean publicOnly, Boolean wrapExceptions, Boolean skipCheckThis, Boolean fillCache)
at System.Activator.CreateInstance[T]()
at EasyNetQ.AutoSubscribe.DefaultAutoSubscriberMessageDispatcher.DispatchAsync[TMessage,TAsyncConsumer](TMessage message)
at EasyNetQ.Consumer.HandlerRunner.InvokeUserMessageHandlerInternalAsync(ConsumerExecutionContext context)
我知道我可以提供我的 own dispatcher,但我们如何才能与 ASP.NET 核心服务提供商合作;确保这适用于范围内的服务?
所以,这就是我的想法。
public class MessageDispatcher : IAutoSubscriberMessageDispatcher
{
private readonly IServiceProvider provider;
public MessageDispatcher(IServiceProvider provider)
{
this.provider = provider;
}
public void Dispatch<TMessage, TConsumer>(TMessage message)
where TMessage : class
where TConsumer : class, IConsume<TMessage>
{
using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
consumer.Consume(message);
}
}
public async Task DispatchAsync<TMessage, TConsumer>(TMessage message)
where TMessage : class
where TConsumer : class, IConsumeAsync<TMessage>
{
using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
await consumer.ConsumeAsync(message);
}
}
}
几个值得注意的点...
IServiceProvider
依赖项是 ASP.NET 核心 DI 容器。一开始可能不清楚,因为在整个 Startup.ConfigureServices()
中,您正在使用另一个接口 IServiceCollection
.
注册类型
public MessageDispatcher(IServiceProvider provider)
{
this.provider = provider;
}
为了解析作用域服务,您需要创建和管理作用域的生命周期围绕创建和使用消费者。我正在使用 GetRequiredService<T>
扩展方法,因为我真的想要一个讨厌的异常,而不是一个空引用,它可能会在我们注意到之前泄漏一段时间(以空引用异常的形式)。
using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
consumer.Consume(message);
}
如果您只直接使用 provider
,如 provider.GetRequiredService<T>()
,您在尝试解析作用域消费者或消费者的作用域依赖项时会看到这样的错误。
Exception thrown: 'System.InvalidOperationException' in Microsoft.Extensions.DependencyInjection.dll: 'Cannot resolve scoped service 'Example.Messages.ConsumeTextMessage' from root provider.'
为了解析作用域服务并为异步消费者正确维护它们的生命周期,您需要在正确的位置获取async/await关键字。您应该等待 ConsumeAsync
调用,这要求该方法是异步的。在 await 行和您的消费者中使用断点和步骤 line-by-line 以更好地处理这个问题!
public async Task DispatchAsync<TMessage, TConsumer>(TMessage message)
where TMessage : class
where TConsumer : class, IConsumeAsync<TMessage>
{
using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
await consumer.ConsumeAsync(message);
}
}
好的,现在我们有了调度程序,我们只需要在 Startup 中正确设置所有内容。我们需要解决 提供者的调度程序,以便提供者可以正确地提供自己。这只是一种方法。
Startup.cs
public void ConfigureServices(IServiceCollection services)
{
// messaging
services.AddSingleton<IBus>(RabbitHutch.CreateBus("host=localhost"));
services.AddSingleton<MessageDispatcher>();
services.AddSingleton<AutoSubscriber>(provider =>
{
var subscriber = new AutoSubscriber(provider.GetRequiredService<IBus>(), "example")
{
AutoSubscriberMessageDispatcher = provider.GetRequiredService<MessageDispatcher>();
}
});
// message handlers
services.AddScoped<ConsumeTextMessage>();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
app.ApplicationServices.GetRequiredServices<AutoSubscriber>().SubscribeAsync(Assembly.GetExecutingAssembly());
}
因此,EasyNetQ 自动订阅者有一个基本的默认调度程序,无法使用非无参数构造函数创建消息消费者类。
要查看实际效果,请创建一个具有所需依赖项的使用者。您可以设置自己的服务,也可以使用 ILogger<T>
,它由框架默认值自动注册。
ConsumeTextMessage.cs
public class ConsumeTextMessage : IConsume<TextMessage>
{
private readonly ILogger<ConsumeTextMessage> logger;
public ConsumeTextMessage(ILogger<ConsumeTextMessage> logger)
{
this.logger = logger;
}
public void Consume(TextMessage message)
{
...
}
}
连接自动订户(这里有一些余地,就 where/when 到 write/run 此代码)。
Startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IBus>(RabbitHutch.CreateBus("host=localhost"));
}
(在其他地方,可能是 startup.Configure
或 BackgroundService
)
var subscriber = new AutoSubscriber(bus, "example");
subscriber.Subscribe(Assembly.GetExecutingAssembly());
现在,启动程序并发布一些消息,您应该会看到每条消息都在默认错误队列中结束。
System.MissingMethodException: No parameterless constructor defined for this object.
at System.RuntimeTypeHandle.CreateInstance(RuntimeType type, Boolean publicOnly, Boolean wrapExceptions, Boolean& canBeCached, RuntimeMethodHandleInternal& ctor)
at System.RuntimeType.CreateInstanceSlow(Boolean publicOnly, Boolean wrapExceptions, Boolean skipCheckThis, Boolean fillCache)
at System.Activator.CreateInstance[T]()
at EasyNetQ.AutoSubscribe.DefaultAutoSubscriberMessageDispatcher.DispatchAsync[TMessage,TAsyncConsumer](TMessage message)
at EasyNetQ.Consumer.HandlerRunner.InvokeUserMessageHandlerInternalAsync(ConsumerExecutionContext context)
我知道我可以提供我的 own dispatcher,但我们如何才能与 ASP.NET 核心服务提供商合作;确保这适用于范围内的服务?
所以,这就是我的想法。
public class MessageDispatcher : IAutoSubscriberMessageDispatcher
{
private readonly IServiceProvider provider;
public MessageDispatcher(IServiceProvider provider)
{
this.provider = provider;
}
public void Dispatch<TMessage, TConsumer>(TMessage message)
where TMessage : class
where TConsumer : class, IConsume<TMessage>
{
using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
consumer.Consume(message);
}
}
public async Task DispatchAsync<TMessage, TConsumer>(TMessage message)
where TMessage : class
where TConsumer : class, IConsumeAsync<TMessage>
{
using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
await consumer.ConsumeAsync(message);
}
}
}
几个值得注意的点...
IServiceProvider
依赖项是 ASP.NET 核心 DI 容器。一开始可能不清楚,因为在整个 Startup.ConfigureServices()
中,您正在使用另一个接口 IServiceCollection
.
public MessageDispatcher(IServiceProvider provider)
{
this.provider = provider;
}
为了解析作用域服务,您需要创建和管理作用域的生命周期围绕创建和使用消费者。我正在使用 GetRequiredService<T>
扩展方法,因为我真的想要一个讨厌的异常,而不是一个空引用,它可能会在我们注意到之前泄漏一段时间(以空引用异常的形式)。
using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
consumer.Consume(message);
}
如果您只直接使用 provider
,如 provider.GetRequiredService<T>()
,您在尝试解析作用域消费者或消费者的作用域依赖项时会看到这样的错误。
Exception thrown: 'System.InvalidOperationException' in Microsoft.Extensions.DependencyInjection.dll: 'Cannot resolve scoped service 'Example.Messages.ConsumeTextMessage' from root provider.'
为了解析作用域服务并为异步消费者正确维护它们的生命周期,您需要在正确的位置获取async/await关键字。您应该等待 ConsumeAsync
调用,这要求该方法是异步的。在 await 行和您的消费者中使用断点和步骤 line-by-line 以更好地处理这个问题!
public async Task DispatchAsync<TMessage, TConsumer>(TMessage message)
where TMessage : class
where TConsumer : class, IConsumeAsync<TMessage>
{
using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
await consumer.ConsumeAsync(message);
}
}
好的,现在我们有了调度程序,我们只需要在 Startup 中正确设置所有内容。我们需要解决 提供者的调度程序,以便提供者可以正确地提供自己。这只是一种方法。
Startup.cs
public void ConfigureServices(IServiceCollection services)
{
// messaging
services.AddSingleton<IBus>(RabbitHutch.CreateBus("host=localhost"));
services.AddSingleton<MessageDispatcher>();
services.AddSingleton<AutoSubscriber>(provider =>
{
var subscriber = new AutoSubscriber(provider.GetRequiredService<IBus>(), "example")
{
AutoSubscriberMessageDispatcher = provider.GetRequiredService<MessageDispatcher>();
}
});
// message handlers
services.AddScoped<ConsumeTextMessage>();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
app.ApplicationServices.GetRequiredServices<AutoSubscriber>().SubscribeAsync(Assembly.GetExecutingAssembly());
}