MassTransit 中间件 - 获取 IServiceProvider 的实例

MassTransit middleware - Getting instance of IServiceProvider

我正在尝试为我的 MassTransit 管道创建一些 FluentValidation 中间件。我认为对于我的第一个 MassTransit 中间件来说,这将是一个相当简单的项目,但恐怕我已经无法理解了。有人可以救救我吗?

MassTransit middleware documentation I have been referring to this video 指导外:

这是我的过滤器:

public class FluentValidationFilter<TMessage> : IFilter<ConsumeContext<TMessage>>
    where TMessage : class
{
    public void Probe(ProbeContext context)
    {
        context.CreateScope(nameof(FluentValidationFilter<TMessage>));
    }


    public async Task Send(
        ConsumeContext<TMessage> context,
        IPipe<ConsumeContext<TMessage>> next)
    {
        var serviceProvider = context.GetPayload<IServiceProvider>();
        // This is always null (I think because I'm using the wrong kind of context).

        var validator = serviceProvider.GetService<IValidator<TMessage>>();

        if (validator is { })
        {
            var message = context.Message;
            var validationResult = await validator.ValidateAsync(message, context.CancellationToken);

            if (validationResult.IsValid)
                await next.Send(context);
            else
                await context.RespondAsync("That one was invalid.");
            // I'm definitely going to have to do something better here, but I'm taking it one step at a time!
        }
    }
}

然后我必须观察任何类型消息的配置,并为该类型的消息注册过滤器,为此我制作了这个配置观察器:

public class FluentValidationFilterConfigurationObserver : IConsumerConfigurationObserver
{
    private readonly IConsumePipeConfigurator _pipeConfigurator;

    public FluentValidationFilterConfigurationObserver(IConsumePipeConfigurator configurator)
    {
        _pipeConfigurator = configurator ?? throw new ArgumentNullException(nameof(configurator));
    }

    public void ConsumerConfigured<TConsumer>(IConsumerConfigurator<TConsumer> configurator) where TConsumer : class
    {
        return;
    }

    public void ConsumerMessageConfigured<TConsumer, TMessage>(IConsumerMessageConfigurator<TConsumer, TMessage> messageConfigurator)
        where TConsumer : class
        where TMessage : class
    {
        _pipeConfigurator.AddPipeSpecification(
            new FilterPipeSpecification<ConsumeContext<TMessage>>(new FluentValidationFilter<TMessage>()));

        _pipeConfigurator.UseFilter(new FluentValidationFilter<TMessage>());
    }
}

然后,在Startup.ConfigureServices我执行这个扩展方法来使用配置观察者:

public static class FluentValidationExtensions
{
    public static void UseFluentValidation(this IEndpointConfigurator configurator)
    {
        configurator.ConnectConsumerConfigurationObserver(new FluentValidationFilterConfigurationObserver(configurator));
    }
}

我的 FluentValidationFilterSend 方法在调用 context.GetPayload 时从未找到 IServiceProvider 的实例。我读到这是因为它在 ConsumeContext<TMessage> 中不可用,所以我尝试实现 IFilter<ConsumerConsumeContext<TMessage>,但这阻止了我的 ConfigurationObserver 代码的编译,因为它不再是必需的类型。

我搜索了听起来相似的接口(比如 IConsumerConsumeConfigurationObserver)和听起来相似的重载(比如 ConnectConsumerConsumeConfigurationObserver),它们可能是正确的选择,但我找不到任何东西。

如何使用正确类型的上下文来获取 IServiceProvider,但仍然使用我的 FluentValidationFilterConfigurationObserver,拜托?

如果您需要从中间件访问容器,我建议您使用作用域过滤器。 a sample 显示了它们是如何构建的。

按照 Chris Patterson 的建议,您可以构建一个消息验证器过滤器:

public class MessageValidatorFilter<T> : IFilter<ConsumeContext<T>>
    where T : class
{
    private readonly ILogger<MessageValidatorFilter<T>> _logger;
    private readonly IValidator<T> _validator;

    public MessageValidatorFilter(ILogger<MessageValidatorFilter<T>> logger, IServiceProvider serviceProvider)
    {
        _logger = logger;
        _validator = serviceProvider.GetService<IValidator<T>>();
    }

    public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
    {
        var validationResult = _validator is not null
            ? await _validator.ValidateAsync(context.Message, context.CancellationToken)
            : new ValidationResult();

        if (validationResult.IsValid is false)
        {
            _logger.LogError("Message validation errors: {Errors}", validationResult.Errors);

            await context.Send(
                destinationAddress: new($"queue:yourcontext-{KebabCaseEndpointNameFormatter.Instance.SanitizeName(typeof(T).Name)}-validation-errors"),
                message: new ValidationResultMessage<T>(context.Message, validationResult));

            return;
        }

        await next.Send(context);
    }

    public void Probe(ProbeContext context) { }
}

验证结果消息:

public record ValidationResultMessage<TMessage>(TMessage Message, ValidationResult ValidationResult);

如何使用ConsumeFilter:

services.AddMassTransit(cfg =>
{
    ...

    cfg.UsingRabbitMq((context, bus) =>
    {
        ...

        bus.UseConsumeFilter(typeof(MessageValidatorFilter<>), context);
        bus.ConfigureEndpoints(context);
    });
})