BatchConsumer 的 ConsumerDefinition 中的 MassTransit UseMessageRetry 在启动时抛出错误

MassTransit UseMessageRetry in ConsumerDefinition for BatchConsumer throws error on startup

我正在尝试为 Azure 服务总线主题订阅设置批处理使用者。并没有真正让它发挥作用。使用下面的设置,由于 BatchConsumerDefinition.

中的消息重试配置,启动时会出现异常
  1. 我是不是做错了什么?
  2. 如果我禁用重试,它就会开始。但是失败的消息不会在订阅的 DLQ 上结束。这是为什么?

谢谢!

services.AddMassTransit(configurator =>
{
    configurator.AddConsumer<BatchMessageEventConsumer, BatchConsumerDefinition>();

    configurator.UsingAzureServiceBus((context, factoryConfigurator) =>
    {
        factoryConfigurator.Host(asbConnectionString);

        factoryConfigurator.SubscriptionEndpoint<IMessageEvent>("my-subscription", endpointConfigurator =>
        {
            endpointConfigurator.ConfigureConsumer<BatchMessageEventConsumer>(context);
            endpointConfigurator.Batch<IMessageEvent>(x =>
            {
                x.Consumer<BatchMessageEventConsumer, IMessageEvent>(context); // provider, context, etc.
            });
        });
    });
});

services.AddMassTransitHostedService(true);
public class BatchConsumerDefinition : ConsumerDefinition<BatchMessageEventConsumer>
{
    public BatchConsumerDefinition()
    {
        Endpoint(x => x.PrefetchCount = 1000);
    }

    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<BatchMessageEventConsumer> consumerConfigurator)
    {
        // This will cause startup to throw: "System.ArgumentException: 'The type or method has 1 generic parameter(s), but 2 generic argument(s) were provided. A generic argument must be provided for each generic parameter.'"
        consumerConfigurator.UseMessageRetry(x => x.Interval(2, 100));

        consumerConfigurator.Options<BatchOptions>(options => options
            .SetMessageLimit(100)
            .SetTimeLimit(1000)
            .SetConcurrencyLimit(10));
    }
}
System.ArgumentException: 'The type or method has 1 generic parameter(s), but 2 generic argument(s) were provided. A generic argument must be provided for each generic parameter.'

   at System.RuntimeType.SanityCheckGenericArguments(RuntimeType[] genericArguments, RuntimeType[] genericParamters)
   at System.Reflection.RuntimeMethodInfo.MakeGenericMethod(Type[] methodInstantiation)
   at MassTransit.PipeConfigurators.MessageRetryConsumerConfigurationObserver`1.MassTransit.ConsumeConfigurators.IConsumerConfigurationObserver.ConsumerMessageConfigured[T,TMessage](IConsumerMessageConfigurator`2 configurator)
   at MassTransit.ConsumeConfigurators.ConsumerConfigurationObservable.<>c__DisplayClass1_0`2.<ConsumerMessageConfigured>b__0(IConsumerConfigurationObserver observer)
   at GreenPipes.Util.Connectable`1.All(Func`2 callback)
   at MassTransit.ConsumeConfigurators.ConsumerConfigurationObservable.ConsumerMessageConfigured[TConsumer,TMessage](IConsumerMessageConfigurator`2 configurator)
   at MassTransit.ConsumerSpecifications.BatchConsumerMessageSpecification`2.<>c__DisplayClass7_0.<Validate>b__0(IConsumerConfigurationObserver observer)
   at GreenPipes.Util.Connectable`1.All(Func`2 callback)
   at MassTransit.ConsumerSpecifications.BatchConsumerMessageSpecification`2.Validate()
   at MassTransit.ConsumerSpecifications.ConsumerSpecification`1.<>c.<Validate>b__7_1(IConsumerMessageSpecification`1 x)
   at System.Linq.Enumerable.SelectManySingleSelectorIterator`2.MoveNext()
   at MassTransit.ConsumerSpecifications.ConsumerSpecification`1.<Validate>d__7.MoveNext()
   at System.Linq.Enumerable.ConcatIterator`1.MoveNext()
   at System.Linq.Enumerable.SelectManySingleSelectorIterator`2.MoveNext()
   at MassTransit.Configuration.ReceiveEndpointConfiguration.<Validate>d__24.MoveNext()
   at System.Linq.Enumerable.ConcatIterator`1.MoveNext()
   at System.Linq.Enumerable.ConcatIterator`1.MoveNext()
   at System.Linq.Enumerable.SelectManySingleSelectorIterator`2.MoveNext()
   at System.Collections.Generic.List`1.InsertRange(Int32 index, IEnumerable`1 collection)
   at System.Collections.Generic.List`1.AddRange(IEnumerable`1 collection)
   at System.Linq.Enumerable.ConcatIterator`1.ToList()
   at System.Linq.Enumerable.ToList[TSource](IEnumerable`1 source)
   at MassTransit.Configurators.BusConfigurationResult..ctor(IEnumerable`1 results)
   at MassTransit.Configurators.BusConfigurationResult.CompileResults(IEnumerable`1 results)
   at MassTransit.Registration.TransportRegistrationBusFactory`1.CreateBus[T,TConfigurator](T configurator, IBusRegistrationContext context, Action`2 configure, IEnumerable`1 specifications)
   at MassTransit.Azure.ServiceBus.Core.Configuration.ServiceBusRegistrationBusFactory.CreateBus(IBusRegistrationContext context, IEnumerable`1 specifications)
   at MassTransit.ExtensionsDependencyInjectionIntegration.Registration.ServiceCollectionBusConfigurator.CreateBus[T](T busFactory, IServiceProvider provider)
   at MassTransit.ExtensionsDependencyInjectionIntegration.Registration.ServiceCollectionBusConfigurator.<>c__DisplayClass7_0`1.<SetBusFactory>b__0(IServiceProvider provider)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitFactory(FactoryCallSite factoryCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.DynamicServiceProviderEngine.<>c__DisplayClass1_0.<RealizeService>b__0(ServiceProviderEngineScope scope)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngine.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
   at MassTransit.ExtensionsDependencyInjectionIntegration.Registration.ServiceCollectionBusConfigurator.<>c__7`1.<SetBusFactory>b__7_1(IServiceProvider provider)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitFactory(FactoryCallSite factoryCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitIEnumerable(IEnumerableCallSite enumerableCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitConstructor(ConstructorCallSite constructorCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.DynamicServiceProviderEngine.<>c__DisplayClass1_0.<RealizeService>b__0(ServiceProviderEngineScope scope)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngine.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
   at MassTransit.HostedServiceConfigurationExtensions.<>c__DisplayClass1_0.<AddMassTransitHostedService>g__HostedServiceFactory|0(IServiceProvider provider)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitFactory(FactoryCallSite factoryCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitIEnumerable(IEnumerableCallSite enumerableCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.DynamicServiceProviderEngine.<>c__DisplayClass1_0.<RealizeService>b__0(ServiceProviderEngineScope scope)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngine.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetService[T](IServiceProvider provider)
   at Microsoft.Extensions.Hosting.Internal.Host.<StartAsync>d__9.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.<RunAsync>d__4.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.<RunAsync>d__4.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()

首先,您需要从配置中删除重复的使用者。

订阅端点应如下所示配置:

factoryConfigurator.SubscriptionEndpoint<IMessageEvent>("my-subscription", endpointConfigurator =>
{
    endpointConfigurator.ConfigureConsumer<BatchMessageEventConsumer>(context);
});

此外,您应该将定义更改为:

public class BatchConsumerDefinition : 
    ConsumerDefinition<BatchMessageEventConsumer>
{
    public BatchConsumerDefinition()
    {
        Endpoint(x => x.PrefetchCount = 1000);
    }

    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<BatchMessageEventConsumer> consumerConfigurator)
    {
        endpointConfigurator.UseMessageRetry(x => x.Interval(2, 100));

        consumerConfigurator.Options<BatchOptions>(options => options
            .SetMessageLimit(100)
            .SetTimeLimit(1000)
            .SetConcurrencyLimit(10));
    }
}

这可能会解决它。