BatchConsumer 的 ConsumerDefinition 中的 MassTransit UseMessageRetry 在启动时抛出错误
MassTransit UseMessageRetry in ConsumerDefinition for BatchConsumer throws error on startup
我正在尝试为 Azure 服务总线主题订阅设置批处理使用者。并没有真正让它发挥作用。使用下面的设置,由于 BatchConsumerDefinition
.
中的消息重试配置,启动时会出现异常
- 我是不是做错了什么?
- 如果我禁用重试,它就会开始。但是失败的消息不会在订阅的 DLQ 上结束。这是为什么?
谢谢!
services.AddMassTransit(configurator =>
{
configurator.AddConsumer<BatchMessageEventConsumer, BatchConsumerDefinition>();
configurator.UsingAzureServiceBus((context, factoryConfigurator) =>
{
factoryConfigurator.Host(asbConnectionString);
factoryConfigurator.SubscriptionEndpoint<IMessageEvent>("my-subscription", endpointConfigurator =>
{
endpointConfigurator.ConfigureConsumer<BatchMessageEventConsumer>(context);
endpointConfigurator.Batch<IMessageEvent>(x =>
{
x.Consumer<BatchMessageEventConsumer, IMessageEvent>(context); // provider, context, etc.
});
});
});
});
services.AddMassTransitHostedService(true);
public class BatchConsumerDefinition : ConsumerDefinition<BatchMessageEventConsumer>
{
public BatchConsumerDefinition()
{
Endpoint(x => x.PrefetchCount = 1000);
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<BatchMessageEventConsumer> consumerConfigurator)
{
// This will cause startup to throw: "System.ArgumentException: 'The type or method has 1 generic parameter(s), but 2 generic argument(s) were provided. A generic argument must be provided for each generic parameter.'"
consumerConfigurator.UseMessageRetry(x => x.Interval(2, 100));
consumerConfigurator.Options<BatchOptions>(options => options
.SetMessageLimit(100)
.SetTimeLimit(1000)
.SetConcurrencyLimit(10));
}
}
System.ArgumentException: 'The type or method has 1 generic parameter(s), but 2 generic argument(s) were provided. A generic argument must be provided for each generic parameter.'
at System.RuntimeType.SanityCheckGenericArguments(RuntimeType[] genericArguments, RuntimeType[] genericParamters)
at System.Reflection.RuntimeMethodInfo.MakeGenericMethod(Type[] methodInstantiation)
at MassTransit.PipeConfigurators.MessageRetryConsumerConfigurationObserver`1.MassTransit.ConsumeConfigurators.IConsumerConfigurationObserver.ConsumerMessageConfigured[T,TMessage](IConsumerMessageConfigurator`2 configurator)
at MassTransit.ConsumeConfigurators.ConsumerConfigurationObservable.<>c__DisplayClass1_0`2.<ConsumerMessageConfigured>b__0(IConsumerConfigurationObserver observer)
at GreenPipes.Util.Connectable`1.All(Func`2 callback)
at MassTransit.ConsumeConfigurators.ConsumerConfigurationObservable.ConsumerMessageConfigured[TConsumer,TMessage](IConsumerMessageConfigurator`2 configurator)
at MassTransit.ConsumerSpecifications.BatchConsumerMessageSpecification`2.<>c__DisplayClass7_0.<Validate>b__0(IConsumerConfigurationObserver observer)
at GreenPipes.Util.Connectable`1.All(Func`2 callback)
at MassTransit.ConsumerSpecifications.BatchConsumerMessageSpecification`2.Validate()
at MassTransit.ConsumerSpecifications.ConsumerSpecification`1.<>c.<Validate>b__7_1(IConsumerMessageSpecification`1 x)
at System.Linq.Enumerable.SelectManySingleSelectorIterator`2.MoveNext()
at MassTransit.ConsumerSpecifications.ConsumerSpecification`1.<Validate>d__7.MoveNext()
at System.Linq.Enumerable.ConcatIterator`1.MoveNext()
at System.Linq.Enumerable.SelectManySingleSelectorIterator`2.MoveNext()
at MassTransit.Configuration.ReceiveEndpointConfiguration.<Validate>d__24.MoveNext()
at System.Linq.Enumerable.ConcatIterator`1.MoveNext()
at System.Linq.Enumerable.ConcatIterator`1.MoveNext()
at System.Linq.Enumerable.SelectManySingleSelectorIterator`2.MoveNext()
at System.Collections.Generic.List`1.InsertRange(Int32 index, IEnumerable`1 collection)
at System.Collections.Generic.List`1.AddRange(IEnumerable`1 collection)
at System.Linq.Enumerable.ConcatIterator`1.ToList()
at System.Linq.Enumerable.ToList[TSource](IEnumerable`1 source)
at MassTransit.Configurators.BusConfigurationResult..ctor(IEnumerable`1 results)
at MassTransit.Configurators.BusConfigurationResult.CompileResults(IEnumerable`1 results)
at MassTransit.Registration.TransportRegistrationBusFactory`1.CreateBus[T,TConfigurator](T configurator, IBusRegistrationContext context, Action`2 configure, IEnumerable`1 specifications)
at MassTransit.Azure.ServiceBus.Core.Configuration.ServiceBusRegistrationBusFactory.CreateBus(IBusRegistrationContext context, IEnumerable`1 specifications)
at MassTransit.ExtensionsDependencyInjectionIntegration.Registration.ServiceCollectionBusConfigurator.CreateBus[T](T busFactory, IServiceProvider provider)
at MassTransit.ExtensionsDependencyInjectionIntegration.Registration.ServiceCollectionBusConfigurator.<>c__DisplayClass7_0`1.<SetBusFactory>b__0(IServiceProvider provider)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitFactory(FactoryCallSite factoryCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.DynamicServiceProviderEngine.<>c__DisplayClass1_0.<RealizeService>b__0(ServiceProviderEngineScope scope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngine.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
at MassTransit.ExtensionsDependencyInjectionIntegration.Registration.ServiceCollectionBusConfigurator.<>c__7`1.<SetBusFactory>b__7_1(IServiceProvider provider)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitFactory(FactoryCallSite factoryCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitIEnumerable(IEnumerableCallSite enumerableCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitConstructor(ConstructorCallSite constructorCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.DynamicServiceProviderEngine.<>c__DisplayClass1_0.<RealizeService>b__0(ServiceProviderEngineScope scope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngine.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
at MassTransit.HostedServiceConfigurationExtensions.<>c__DisplayClass1_0.<AddMassTransitHostedService>g__HostedServiceFactory|0(IServiceProvider provider)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitFactory(FactoryCallSite factoryCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitIEnumerable(IEnumerableCallSite enumerableCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.DynamicServiceProviderEngine.<>c__DisplayClass1_0.<RealizeService>b__0(ServiceProviderEngineScope scope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngine.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetService[T](IServiceProvider provider)
at Microsoft.Extensions.Hosting.Internal.Host.<StartAsync>d__9.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.<RunAsync>d__4.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.<RunAsync>d__4.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
首先,您需要从配置中删除重复的使用者。
订阅端点应如下所示配置:
factoryConfigurator.SubscriptionEndpoint<IMessageEvent>("my-subscription", endpointConfigurator =>
{
endpointConfigurator.ConfigureConsumer<BatchMessageEventConsumer>(context);
});
此外,您应该将定义更改为:
public class BatchConsumerDefinition :
ConsumerDefinition<BatchMessageEventConsumer>
{
public BatchConsumerDefinition()
{
Endpoint(x => x.PrefetchCount = 1000);
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<BatchMessageEventConsumer> consumerConfigurator)
{
endpointConfigurator.UseMessageRetry(x => x.Interval(2, 100));
consumerConfigurator.Options<BatchOptions>(options => options
.SetMessageLimit(100)
.SetTimeLimit(1000)
.SetConcurrencyLimit(10));
}
}
这可能会解决它。
我正在尝试为 Azure 服务总线主题订阅设置批处理使用者。并没有真正让它发挥作用。使用下面的设置,由于 BatchConsumerDefinition
.
- 我是不是做错了什么?
- 如果我禁用重试,它就会开始。但是失败的消息不会在订阅的 DLQ 上结束。这是为什么?
谢谢!
services.AddMassTransit(configurator =>
{
configurator.AddConsumer<BatchMessageEventConsumer, BatchConsumerDefinition>();
configurator.UsingAzureServiceBus((context, factoryConfigurator) =>
{
factoryConfigurator.Host(asbConnectionString);
factoryConfigurator.SubscriptionEndpoint<IMessageEvent>("my-subscription", endpointConfigurator =>
{
endpointConfigurator.ConfigureConsumer<BatchMessageEventConsumer>(context);
endpointConfigurator.Batch<IMessageEvent>(x =>
{
x.Consumer<BatchMessageEventConsumer, IMessageEvent>(context); // provider, context, etc.
});
});
});
});
services.AddMassTransitHostedService(true);
public class BatchConsumerDefinition : ConsumerDefinition<BatchMessageEventConsumer>
{
public BatchConsumerDefinition()
{
Endpoint(x => x.PrefetchCount = 1000);
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<BatchMessageEventConsumer> consumerConfigurator)
{
// This will cause startup to throw: "System.ArgumentException: 'The type or method has 1 generic parameter(s), but 2 generic argument(s) were provided. A generic argument must be provided for each generic parameter.'"
consumerConfigurator.UseMessageRetry(x => x.Interval(2, 100));
consumerConfigurator.Options<BatchOptions>(options => options
.SetMessageLimit(100)
.SetTimeLimit(1000)
.SetConcurrencyLimit(10));
}
}
System.ArgumentException: 'The type or method has 1 generic parameter(s), but 2 generic argument(s) were provided. A generic argument must be provided for each generic parameter.'
at System.RuntimeType.SanityCheckGenericArguments(RuntimeType[] genericArguments, RuntimeType[] genericParamters)
at System.Reflection.RuntimeMethodInfo.MakeGenericMethod(Type[] methodInstantiation)
at MassTransit.PipeConfigurators.MessageRetryConsumerConfigurationObserver`1.MassTransit.ConsumeConfigurators.IConsumerConfigurationObserver.ConsumerMessageConfigured[T,TMessage](IConsumerMessageConfigurator`2 configurator)
at MassTransit.ConsumeConfigurators.ConsumerConfigurationObservable.<>c__DisplayClass1_0`2.<ConsumerMessageConfigured>b__0(IConsumerConfigurationObserver observer)
at GreenPipes.Util.Connectable`1.All(Func`2 callback)
at MassTransit.ConsumeConfigurators.ConsumerConfigurationObservable.ConsumerMessageConfigured[TConsumer,TMessage](IConsumerMessageConfigurator`2 configurator)
at MassTransit.ConsumerSpecifications.BatchConsumerMessageSpecification`2.<>c__DisplayClass7_0.<Validate>b__0(IConsumerConfigurationObserver observer)
at GreenPipes.Util.Connectable`1.All(Func`2 callback)
at MassTransit.ConsumerSpecifications.BatchConsumerMessageSpecification`2.Validate()
at MassTransit.ConsumerSpecifications.ConsumerSpecification`1.<>c.<Validate>b__7_1(IConsumerMessageSpecification`1 x)
at System.Linq.Enumerable.SelectManySingleSelectorIterator`2.MoveNext()
at MassTransit.ConsumerSpecifications.ConsumerSpecification`1.<Validate>d__7.MoveNext()
at System.Linq.Enumerable.ConcatIterator`1.MoveNext()
at System.Linq.Enumerable.SelectManySingleSelectorIterator`2.MoveNext()
at MassTransit.Configuration.ReceiveEndpointConfiguration.<Validate>d__24.MoveNext()
at System.Linq.Enumerable.ConcatIterator`1.MoveNext()
at System.Linq.Enumerable.ConcatIterator`1.MoveNext()
at System.Linq.Enumerable.SelectManySingleSelectorIterator`2.MoveNext()
at System.Collections.Generic.List`1.InsertRange(Int32 index, IEnumerable`1 collection)
at System.Collections.Generic.List`1.AddRange(IEnumerable`1 collection)
at System.Linq.Enumerable.ConcatIterator`1.ToList()
at System.Linq.Enumerable.ToList[TSource](IEnumerable`1 source)
at MassTransit.Configurators.BusConfigurationResult..ctor(IEnumerable`1 results)
at MassTransit.Configurators.BusConfigurationResult.CompileResults(IEnumerable`1 results)
at MassTransit.Registration.TransportRegistrationBusFactory`1.CreateBus[T,TConfigurator](T configurator, IBusRegistrationContext context, Action`2 configure, IEnumerable`1 specifications)
at MassTransit.Azure.ServiceBus.Core.Configuration.ServiceBusRegistrationBusFactory.CreateBus(IBusRegistrationContext context, IEnumerable`1 specifications)
at MassTransit.ExtensionsDependencyInjectionIntegration.Registration.ServiceCollectionBusConfigurator.CreateBus[T](T busFactory, IServiceProvider provider)
at MassTransit.ExtensionsDependencyInjectionIntegration.Registration.ServiceCollectionBusConfigurator.<>c__DisplayClass7_0`1.<SetBusFactory>b__0(IServiceProvider provider)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitFactory(FactoryCallSite factoryCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.DynamicServiceProviderEngine.<>c__DisplayClass1_0.<RealizeService>b__0(ServiceProviderEngineScope scope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngine.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
at MassTransit.ExtensionsDependencyInjectionIntegration.Registration.ServiceCollectionBusConfigurator.<>c__7`1.<SetBusFactory>b__7_1(IServiceProvider provider)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitFactory(FactoryCallSite factoryCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitIEnumerable(IEnumerableCallSite enumerableCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitConstructor(ConstructorCallSite constructorCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.DynamicServiceProviderEngine.<>c__DisplayClass1_0.<RealizeService>b__0(ServiceProviderEngineScope scope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngine.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
at MassTransit.HostedServiceConfigurationExtensions.<>c__DisplayClass1_0.<AddMassTransitHostedService>g__HostedServiceFactory|0(IServiceProvider provider)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitFactory(FactoryCallSite factoryCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitIEnumerable(IEnumerableCallSite enumerableCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.DynamicServiceProviderEngine.<>c__DisplayClass1_0.<RealizeService>b__0(ServiceProviderEngineScope scope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngine.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetService[T](IServiceProvider provider)
at Microsoft.Extensions.Hosting.Internal.Host.<StartAsync>d__9.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.<RunAsync>d__4.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.<RunAsync>d__4.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
首先,您需要从配置中删除重复的使用者。
订阅端点应如下所示配置:
factoryConfigurator.SubscriptionEndpoint<IMessageEvent>("my-subscription", endpointConfigurator =>
{
endpointConfigurator.ConfigureConsumer<BatchMessageEventConsumer>(context);
});
此外,您应该将定义更改为:
public class BatchConsumerDefinition :
ConsumerDefinition<BatchMessageEventConsumer>
{
public BatchConsumerDefinition()
{
Endpoint(x => x.PrefetchCount = 1000);
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<BatchMessageEventConsumer> consumerConfigurator)
{
endpointConfigurator.UseMessageRetry(x => x.Interval(2, 100));
consumerConfigurator.Options<BatchOptions>(options => options
.SetMessageLimit(100)
.SetTimeLimit(1000)
.SetConcurrencyLimit(10));
}
}
这可能会解决它。