MassTransit 中间件 - 获取 IServiceProvider 的实例
MassTransit middleware - Getting instance of IServiceProvider
我正在尝试为我的 MassTransit 管道创建一些 FluentValidation 中间件。我认为对于我的第一个 MassTransit 中间件来说,这将是一个相当简单的项目,但恐怕我已经无法理解了。有人可以救救我吗?
除 MassTransit middleware documentation I have been referring to this video 指导外:
这是我的过滤器:
public class FluentValidationFilter<TMessage> : IFilter<ConsumeContext<TMessage>>
where TMessage : class
{
public void Probe(ProbeContext context)
{
context.CreateScope(nameof(FluentValidationFilter<TMessage>));
}
public async Task Send(
ConsumeContext<TMessage> context,
IPipe<ConsumeContext<TMessage>> next)
{
var serviceProvider = context.GetPayload<IServiceProvider>();
// This is always null (I think because I'm using the wrong kind of context).
var validator = serviceProvider.GetService<IValidator<TMessage>>();
if (validator is { })
{
var message = context.Message;
var validationResult = await validator.ValidateAsync(message, context.CancellationToken);
if (validationResult.IsValid)
await next.Send(context);
else
await context.RespondAsync("That one was invalid.");
// I'm definitely going to have to do something better here, but I'm taking it one step at a time!
}
}
}
然后我必须观察任何类型消息的配置,并为该类型的消息注册过滤器,为此我制作了这个配置观察器:
public class FluentValidationFilterConfigurationObserver : IConsumerConfigurationObserver
{
private readonly IConsumePipeConfigurator _pipeConfigurator;
public FluentValidationFilterConfigurationObserver(IConsumePipeConfigurator configurator)
{
_pipeConfigurator = configurator ?? throw new ArgumentNullException(nameof(configurator));
}
public void ConsumerConfigured<TConsumer>(IConsumerConfigurator<TConsumer> configurator) where TConsumer : class
{
return;
}
public void ConsumerMessageConfigured<TConsumer, TMessage>(IConsumerMessageConfigurator<TConsumer, TMessage> messageConfigurator)
where TConsumer : class
where TMessage : class
{
_pipeConfigurator.AddPipeSpecification(
new FilterPipeSpecification<ConsumeContext<TMessage>>(new FluentValidationFilter<TMessage>()));
_pipeConfigurator.UseFilter(new FluentValidationFilter<TMessage>());
}
}
然后,在Startup.ConfigureServices
我执行这个扩展方法来使用配置观察者:
public static class FluentValidationExtensions
{
public static void UseFluentValidation(this IEndpointConfigurator configurator)
{
configurator.ConnectConsumerConfigurationObserver(new FluentValidationFilterConfigurationObserver(configurator));
}
}
我的 FluentValidationFilter
的 Send
方法在调用 context.GetPayload
时从未找到 IServiceProvider
的实例。我读到这是因为它在 ConsumeContext<TMessage>
中不可用,所以我尝试实现 IFilter<ConsumerConsumeContext<TMessage>
,但这阻止了我的 ConfigurationObserver
代码的编译,因为它不再是必需的类型。
我搜索了听起来相似的接口(比如 IConsumerConsumeConfigurationObserver
)和听起来相似的重载(比如 ConnectConsumerConsumeConfigurationObserver
),它们可能是正确的选择,但我找不到任何东西。
如何使用正确类型的上下文来获取 IServiceProvider,但仍然使用我的 FluentValidationFilterConfigurationObserver
,拜托?
如果您需要从中间件访问容器,我建议您使用作用域过滤器。 a sample 显示了它们是如何构建的。
按照 Chris Patterson 的建议,您可以构建一个消息验证器过滤器:
public class MessageValidatorFilter<T> : IFilter<ConsumeContext<T>>
where T : class
{
private readonly ILogger<MessageValidatorFilter<T>> _logger;
private readonly IValidator<T> _validator;
public MessageValidatorFilter(ILogger<MessageValidatorFilter<T>> logger, IServiceProvider serviceProvider)
{
_logger = logger;
_validator = serviceProvider.GetService<IValidator<T>>();
}
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
{
var validationResult = _validator is not null
? await _validator.ValidateAsync(context.Message, context.CancellationToken)
: new ValidationResult();
if (validationResult.IsValid is false)
{
_logger.LogError("Message validation errors: {Errors}", validationResult.Errors);
await context.Send(
destinationAddress: new($"queue:yourcontext-{KebabCaseEndpointNameFormatter.Instance.SanitizeName(typeof(T).Name)}-validation-errors"),
message: new ValidationResultMessage<T>(context.Message, validationResult));
return;
}
await next.Send(context);
}
public void Probe(ProbeContext context) { }
}
验证结果消息:
public record ValidationResultMessage<TMessage>(TMessage Message, ValidationResult ValidationResult);
如何使用ConsumeFilter:
services.AddMassTransit(cfg =>
{
...
cfg.UsingRabbitMq((context, bus) =>
{
...
bus.UseConsumeFilter(typeof(MessageValidatorFilter<>), context);
bus.ConfigureEndpoints(context);
});
})
我正在尝试为我的 MassTransit 管道创建一些 FluentValidation 中间件。我认为对于我的第一个 MassTransit 中间件来说,这将是一个相当简单的项目,但恐怕我已经无法理解了。有人可以救救我吗?
除 MassTransit middleware documentation I have been referring to this video 指导外:
这是我的过滤器:
public class FluentValidationFilter<TMessage> : IFilter<ConsumeContext<TMessage>>
where TMessage : class
{
public void Probe(ProbeContext context)
{
context.CreateScope(nameof(FluentValidationFilter<TMessage>));
}
public async Task Send(
ConsumeContext<TMessage> context,
IPipe<ConsumeContext<TMessage>> next)
{
var serviceProvider = context.GetPayload<IServiceProvider>();
// This is always null (I think because I'm using the wrong kind of context).
var validator = serviceProvider.GetService<IValidator<TMessage>>();
if (validator is { })
{
var message = context.Message;
var validationResult = await validator.ValidateAsync(message, context.CancellationToken);
if (validationResult.IsValid)
await next.Send(context);
else
await context.RespondAsync("That one was invalid.");
// I'm definitely going to have to do something better here, but I'm taking it one step at a time!
}
}
}
然后我必须观察任何类型消息的配置,并为该类型的消息注册过滤器,为此我制作了这个配置观察器:
public class FluentValidationFilterConfigurationObserver : IConsumerConfigurationObserver
{
private readonly IConsumePipeConfigurator _pipeConfigurator;
public FluentValidationFilterConfigurationObserver(IConsumePipeConfigurator configurator)
{
_pipeConfigurator = configurator ?? throw new ArgumentNullException(nameof(configurator));
}
public void ConsumerConfigured<TConsumer>(IConsumerConfigurator<TConsumer> configurator) where TConsumer : class
{
return;
}
public void ConsumerMessageConfigured<TConsumer, TMessage>(IConsumerMessageConfigurator<TConsumer, TMessage> messageConfigurator)
where TConsumer : class
where TMessage : class
{
_pipeConfigurator.AddPipeSpecification(
new FilterPipeSpecification<ConsumeContext<TMessage>>(new FluentValidationFilter<TMessage>()));
_pipeConfigurator.UseFilter(new FluentValidationFilter<TMessage>());
}
}
然后,在Startup.ConfigureServices
我执行这个扩展方法来使用配置观察者:
public static class FluentValidationExtensions
{
public static void UseFluentValidation(this IEndpointConfigurator configurator)
{
configurator.ConnectConsumerConfigurationObserver(new FluentValidationFilterConfigurationObserver(configurator));
}
}
我的 FluentValidationFilter
的 Send
方法在调用 context.GetPayload
时从未找到 IServiceProvider
的实例。我读到这是因为它在 ConsumeContext<TMessage>
中不可用,所以我尝试实现 IFilter<ConsumerConsumeContext<TMessage>
,但这阻止了我的 ConfigurationObserver
代码的编译,因为它不再是必需的类型。
我搜索了听起来相似的接口(比如 IConsumerConsumeConfigurationObserver
)和听起来相似的重载(比如 ConnectConsumerConsumeConfigurationObserver
),它们可能是正确的选择,但我找不到任何东西。
如何使用正确类型的上下文来获取 IServiceProvider,但仍然使用我的 FluentValidationFilterConfigurationObserver
,拜托?
如果您需要从中间件访问容器,我建议您使用作用域过滤器。 a sample 显示了它们是如何构建的。
按照 Chris Patterson 的建议,您可以构建一个消息验证器过滤器:
public class MessageValidatorFilter<T> : IFilter<ConsumeContext<T>>
where T : class
{
private readonly ILogger<MessageValidatorFilter<T>> _logger;
private readonly IValidator<T> _validator;
public MessageValidatorFilter(ILogger<MessageValidatorFilter<T>> logger, IServiceProvider serviceProvider)
{
_logger = logger;
_validator = serviceProvider.GetService<IValidator<T>>();
}
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
{
var validationResult = _validator is not null
? await _validator.ValidateAsync(context.Message, context.CancellationToken)
: new ValidationResult();
if (validationResult.IsValid is false)
{
_logger.LogError("Message validation errors: {Errors}", validationResult.Errors);
await context.Send(
destinationAddress: new($"queue:yourcontext-{KebabCaseEndpointNameFormatter.Instance.SanitizeName(typeof(T).Name)}-validation-errors"),
message: new ValidationResultMessage<T>(context.Message, validationResult));
return;
}
await next.Send(context);
}
public void Probe(ProbeContext context) { }
}
验证结果消息:
public record ValidationResultMessage<TMessage>(TMessage Message, ValidationResult ValidationResult);
如何使用ConsumeFilter:
services.AddMassTransit(cfg =>
{
...
cfg.UsingRabbitMq((context, bus) =>
{
...
bus.UseConsumeFilter(typeof(MessageValidatorFilter<>), context);
bus.ConfigureEndpoints(context);
});
})