预取计数 > 1 的每条消息的 MassTransit 新消费者(和 di 范围)
MassTransit new consumer (and di scope) for every message with prefetchcount > 1
我正在尝试使用 MassTransit.ExtensionsDependencyInjectionIntegration
进行设置并尝试使用 PrefetchCount。我注意到,对于多条消息,有时会使用同一个消费者。
有没有办法强制公共交通为每条消息创建消费者?
我的服务逻辑需要为每条消息分配新的 di 作用域,据我所知,只有我们每次都创建消费者才有可能。
UPD
我正在使用 this library 注册消费者和发布者。
Here you can see,它向 TryAddTrancient()
注册消费者。这可能是个问题吗?我仍然认为 di container 应该为每个请求生产消费者。
看来这不是故意的行为,我会更深入地研究我的问题。
我建议使用 MassTransit.AspNetCore
包,该包使用
请仔细阅读 documentation,其中包含示例。
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Add framework services
services.AddHealthChecks();
services.AddMvc();
// Register your consumers if the need dependencies
services.AddScoped<SomeDependency>()
services.AddScoped<OrderConsumer>();
// Register MassTransit
services.AddMassTransit(
provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host("localhost", host =>
{
host.Username("guest");
host.Password("guest");
});
cfg.ReceiveEndpoint(host, "submit-order", ep =>
{
ep.PrefetchCount = 16;
ep.UseMessageRetry(x => x.Interval(2, 100));
ep.Consumer<OrderConsumer>(provider);
});
}),
x => x.AddConsumer<OrderConsumer>());
}
// everything else
}
我正在尝试使用 MassTransit.ExtensionsDependencyInjectionIntegration
进行设置并尝试使用 PrefetchCount。我注意到,对于多条消息,有时会使用同一个消费者。
有没有办法强制公共交通为每条消息创建消费者?
我的服务逻辑需要为每条消息分配新的 di 作用域,据我所知,只有我们每次都创建消费者才有可能。
UPD
我正在使用 this library 注册消费者和发布者。
Here you can see,它向 TryAddTrancient()
注册消费者。这可能是个问题吗?我仍然认为 di container 应该为每个请求生产消费者。
看来这不是故意的行为,我会更深入地研究我的问题。
我建议使用 MassTransit.AspNetCore
包,该包使用
请仔细阅读 documentation,其中包含示例。
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Add framework services
services.AddHealthChecks();
services.AddMvc();
// Register your consumers if the need dependencies
services.AddScoped<SomeDependency>()
services.AddScoped<OrderConsumer>();
// Register MassTransit
services.AddMassTransit(
provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host("localhost", host =>
{
host.Username("guest");
host.Password("guest");
});
cfg.ReceiveEndpoint(host, "submit-order", ep =>
{
ep.PrefetchCount = 16;
ep.UseMessageRetry(x => x.Interval(2, 100));
ep.Consumer<OrderConsumer>(provider);
});
}),
x => x.AddConsumer<OrderConsumer>());
}
// everything else
}