MassTransit.QuartzIntegration.ScheduledMessageJob: 无法实例化没有空构造函数的类型(参数 'ScheduledMessageJob')'
MassTransit.QuartzIntegration.ScheduledMessageJob: Cannot instantiate type which has no empty constructor (Parameter 'ScheduledMessageJob')'
目前正在将一个项目移植到 .Net 5,我一直在为 MassTransit 和 Quartz 调度程序配置而苦恼。
非预定消息工作正常,但预定消息则不然。
我能够在 Quartz DB 中看到计划的作业,但是当自动触发作业时出现以下错误:
Quartz.SchedulerException: Problem instantiating class 'MassTransit.QuartzIntegration.ScheduledMessageJob: Cannot instantiate type which has no empty constructor (Parameter 'ScheduledMessageJob')'
本项目有
- 一个 Web 应用程序,用于发送消息和预定消息。
- 一个windows服务(BackgroundService),应该消费消息并发送消息(非计划的)。
网站
在 startup.cs 我有:
NameValueCollection properties = new();
properties.Add("quartz.scheduler.instanceName", "QuartzSchedulerInstance1");
properties.Add("quartz.scheduler.instanceId", "Quartz_instance_1");
properties.Add("quartz.threadPool.type", "Quartz.Simpl.SimpleThreadPool, Quartz");
properties.Add("quartz.threadPool.threadCount", "10");
properties.Add("quartz.threadPool.threadPriority", "2");
properties.Add("quartz.jobStore.misfireThreshold", "60000");
properties.Add("quartz.jobStore.type", "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz");
properties.Add("quartz.jobStore.useProperties", "true");
properties.Add("quartz.jobStore.dataSource", "default");
properties.Add("quartz.jobStore.tablePrefix", "QRTZ_");
properties.Add("quartz.jobStore.lockHandler.type", "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz");
properties.Add("quartz.dataSource.default.connectionString", "Server=(local);Database=Quartz;Trusted_Connection=True;");
properties.Add("quartz.dataSource.default.provider", "SqlServer");
properties.Add("quartz.serializer.type", "json");
StdSchedulerFactory factory = new StdSchedulerFactory(properties);
var scheduler = factory.GetScheduler().Result;
services.AddSingleton(scheduler);
services.AddMassTransit(x =>
{
x.AddMessageScheduler(busSettings.QuartzQueueAddress);
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(busSettings.RabbitMQAddress, host =>
{
host.Username(busSettings.RabbitMQLogin);
host.Password(busSettings.RabbitMQPassword);
});
cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
一条非预定消息是这样发送的:
var message = new BuildReminderDeploySetRequest(Guid.NewGuid(), ... other parameters...);
var target = await _bus.GetSendEndpoint(_busSettings.ReminderRequestQueueAddress);
await target.Send(message);
虽然预定的一个是这样发送的:
var message = new BuildReminderDeploySetRequest(Guid.NewGuid(), ... other parameters...);
await _scheduler.ScheduleSend(_busSettings.ReminderRequestQueueAddress, reminderDate, message);
分别通过DI注入了以下内容:
IBusControl _bus 用于非计划消息
IMessageScheduler _scheduler 用于计划消息。
Windows服务
在 Program.cs 我有:
services.AddTransient<ProcessBuildReminderDeploySetConsumer>();
NameValueCollection properties = new();
properties.Add("quartz.scheduler.instanceName", "QuartzSchedulerInstance1");
properties.Add("quartz.scheduler.instanceId", "Quartz_instance_1");
properties.Add("quartz.threadPool.type", "Quartz.Simpl.SimpleThreadPool, Quartz");
properties.Add("quartz.threadPool.threadCount", "10");
properties.Add("quartz.threadPool.threadPriority", "2");
properties.Add("quartz.jobStore.misfireThreshold", "60000");
properties.Add("quartz.jobStore.type", "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz");
properties.Add("quartz.jobStore.useProperties", "true");
properties.Add("quartz.jobStore.dataSource", "default");
properties.Add("quartz.jobStore.tablePrefix", "QRTZ_");
properties.Add("quartz.jobStore.lockHandler.type", "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz");
properties.Add("quartz.dataSource.default.connectionString", "Server=(local);Database=Quartz;Trusted_Connection=True;");
properties.Add("quartz.dataSource.default.provider", "SqlServer");
properties.Add("quartz.serializer.type", "json");
StdSchedulerFactory factory = new StdSchedulerFactory(properties);
var scheduler = factory.GetScheduler().Result;
scheduler.Start();
services.AddSingleton(scheduler);
services.AddMassTransit(conf =>
{
conf.AddMessageScheduler(busSettings.QuartzQueueAddress);
conf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(busSettings.RabbitMQAddress, host =>
{
host.Username(busSettings.RabbitMQLogin);
host.Password(busSettings.RabbitMQPassword);
});
cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);
cfg.ReceiveEndpoint(busSettings.QuartzQueueName, epc =>
{
var partitioner = cfg.CreatePartitioner(16);
epc.Consumer(() =>
new ScheduleMessageConsumer(scheduler),
x => x.Message<ScheduleMessage>(
m => m.UsePartitioner(
partitioner,
p => p.Message.CorrelationId)
)
);
epc.Consumer(
() => new CancelScheduledMessageConsumer(scheduler),
x => x.Message<CancelScheduledMessage>(
m => m.UsePartitioner(partitioner, p => p.Message.TokenId)
)
);
cfg.UseMessageScheduler(epc.InputAddress);
cfg.Durable = true;
// THIS USED TO BE IN THE OLD .NET 4.6 PROJECT
// BUT THOSE CLASSES DO NOT EXIST ANYMORE
//var specification =
//new SchedulerBusFactorySpecification(scheduler, epc.InputAddress);
//cfg.AddBusFactorySpecification(specification);
});
cfg.ReceiveEndpoint(busSettings.ReminderRequestQueueName, e =>
{
// Enforce loading only 1 message at a time for now
e.PrefetchCount = 1;
e.UseMessageRetry(r =>
{
r.Interval(retryCount: 5, interval: TimeSpan.FromMilliseconds(500));
});
e.Consumer<ProcessBuildReminderDeploySetConsumer>(context);
});
cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
使用直接发送到队列的消息没有问题。
关于定时消息,由于没有被Quartz正确触发,所以无法消费。
我确定可能有一些配置问题导致了这个错误,但是在各种配置方式和版本之间的变化之间,我无法弄清楚这一点。
对于当前版本的 Quartz,有 extension methods for configuring the service collection。这些方法将 ISchedulerFactory
添加到容器中。
然后,在你想要托管quartz的过程中,你可以在MassTransit中使用配置中的非显而易见的扩展方法来配置它:
services.AddMassTransit(conf =>
{
conf.AddMessageScheduler(busSettings.QuartzQueueAddress);
conf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(busSettings.RabbitMQAddress, host =>
{
host.Username(busSettings.RabbitMQLogin);
host.Password(busSettings.RabbitMQPassword);
});
cfg.UseInMemoryScheduler(context, busSettings.QuartzQueueName)
这将使用 Quartz 配置的调度程序工厂(很明显,您将其配置为不在内存中。它还将总线观察器配置为 start/stop quartz 自动作为总线 starts/stops .
目前正在将一个项目移植到 .Net 5,我一直在为 MassTransit 和 Quartz 调度程序配置而苦恼。
非预定消息工作正常,但预定消息则不然。
我能够在 Quartz DB 中看到计划的作业,但是当自动触发作业时出现以下错误:
Quartz.SchedulerException: Problem instantiating class 'MassTransit.QuartzIntegration.ScheduledMessageJob: Cannot instantiate type which has no empty constructor (Parameter 'ScheduledMessageJob')'
本项目有
- 一个 Web 应用程序,用于发送消息和预定消息。
- 一个windows服务(BackgroundService),应该消费消息并发送消息(非计划的)。
网站
在 startup.cs 我有:
NameValueCollection properties = new();
properties.Add("quartz.scheduler.instanceName", "QuartzSchedulerInstance1");
properties.Add("quartz.scheduler.instanceId", "Quartz_instance_1");
properties.Add("quartz.threadPool.type", "Quartz.Simpl.SimpleThreadPool, Quartz");
properties.Add("quartz.threadPool.threadCount", "10");
properties.Add("quartz.threadPool.threadPriority", "2");
properties.Add("quartz.jobStore.misfireThreshold", "60000");
properties.Add("quartz.jobStore.type", "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz");
properties.Add("quartz.jobStore.useProperties", "true");
properties.Add("quartz.jobStore.dataSource", "default");
properties.Add("quartz.jobStore.tablePrefix", "QRTZ_");
properties.Add("quartz.jobStore.lockHandler.type", "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz");
properties.Add("quartz.dataSource.default.connectionString", "Server=(local);Database=Quartz;Trusted_Connection=True;");
properties.Add("quartz.dataSource.default.provider", "SqlServer");
properties.Add("quartz.serializer.type", "json");
StdSchedulerFactory factory = new StdSchedulerFactory(properties);
var scheduler = factory.GetScheduler().Result;
services.AddSingleton(scheduler);
services.AddMassTransit(x =>
{
x.AddMessageScheduler(busSettings.QuartzQueueAddress);
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(busSettings.RabbitMQAddress, host =>
{
host.Username(busSettings.RabbitMQLogin);
host.Password(busSettings.RabbitMQPassword);
});
cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
一条非预定消息是这样发送的:
var message = new BuildReminderDeploySetRequest(Guid.NewGuid(), ... other parameters...);
var target = await _bus.GetSendEndpoint(_busSettings.ReminderRequestQueueAddress);
await target.Send(message);
虽然预定的一个是这样发送的:
var message = new BuildReminderDeploySetRequest(Guid.NewGuid(), ... other parameters...);
await _scheduler.ScheduleSend(_busSettings.ReminderRequestQueueAddress, reminderDate, message);
分别通过DI注入了以下内容: IBusControl _bus 用于非计划消息 IMessageScheduler _scheduler 用于计划消息。
Windows服务
在 Program.cs 我有:
services.AddTransient<ProcessBuildReminderDeploySetConsumer>();
NameValueCollection properties = new();
properties.Add("quartz.scheduler.instanceName", "QuartzSchedulerInstance1");
properties.Add("quartz.scheduler.instanceId", "Quartz_instance_1");
properties.Add("quartz.threadPool.type", "Quartz.Simpl.SimpleThreadPool, Quartz");
properties.Add("quartz.threadPool.threadCount", "10");
properties.Add("quartz.threadPool.threadPriority", "2");
properties.Add("quartz.jobStore.misfireThreshold", "60000");
properties.Add("quartz.jobStore.type", "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz");
properties.Add("quartz.jobStore.useProperties", "true");
properties.Add("quartz.jobStore.dataSource", "default");
properties.Add("quartz.jobStore.tablePrefix", "QRTZ_");
properties.Add("quartz.jobStore.lockHandler.type", "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz");
properties.Add("quartz.dataSource.default.connectionString", "Server=(local);Database=Quartz;Trusted_Connection=True;");
properties.Add("quartz.dataSource.default.provider", "SqlServer");
properties.Add("quartz.serializer.type", "json");
StdSchedulerFactory factory = new StdSchedulerFactory(properties);
var scheduler = factory.GetScheduler().Result;
scheduler.Start();
services.AddSingleton(scheduler);
services.AddMassTransit(conf =>
{
conf.AddMessageScheduler(busSettings.QuartzQueueAddress);
conf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(busSettings.RabbitMQAddress, host =>
{
host.Username(busSettings.RabbitMQLogin);
host.Password(busSettings.RabbitMQPassword);
});
cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);
cfg.ReceiveEndpoint(busSettings.QuartzQueueName, epc =>
{
var partitioner = cfg.CreatePartitioner(16);
epc.Consumer(() =>
new ScheduleMessageConsumer(scheduler),
x => x.Message<ScheduleMessage>(
m => m.UsePartitioner(
partitioner,
p => p.Message.CorrelationId)
)
);
epc.Consumer(
() => new CancelScheduledMessageConsumer(scheduler),
x => x.Message<CancelScheduledMessage>(
m => m.UsePartitioner(partitioner, p => p.Message.TokenId)
)
);
cfg.UseMessageScheduler(epc.InputAddress);
cfg.Durable = true;
// THIS USED TO BE IN THE OLD .NET 4.6 PROJECT
// BUT THOSE CLASSES DO NOT EXIST ANYMORE
//var specification =
//new SchedulerBusFactorySpecification(scheduler, epc.InputAddress);
//cfg.AddBusFactorySpecification(specification);
});
cfg.ReceiveEndpoint(busSettings.ReminderRequestQueueName, e =>
{
// Enforce loading only 1 message at a time for now
e.PrefetchCount = 1;
e.UseMessageRetry(r =>
{
r.Interval(retryCount: 5, interval: TimeSpan.FromMilliseconds(500));
});
e.Consumer<ProcessBuildReminderDeploySetConsumer>(context);
});
cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
使用直接发送到队列的消息没有问题。 关于定时消息,由于没有被Quartz正确触发,所以无法消费。
我确定可能有一些配置问题导致了这个错误,但是在各种配置方式和版本之间的变化之间,我无法弄清楚这一点。
对于当前版本的 Quartz,有 extension methods for configuring the service collection。这些方法将 ISchedulerFactory
添加到容器中。
然后,在你想要托管quartz的过程中,你可以在MassTransit中使用配置中的非显而易见的扩展方法来配置它:
services.AddMassTransit(conf =>
{
conf.AddMessageScheduler(busSettings.QuartzQueueAddress);
conf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(busSettings.RabbitMQAddress, host =>
{
host.Username(busSettings.RabbitMQLogin);
host.Password(busSettings.RabbitMQPassword);
});
cfg.UseInMemoryScheduler(context, busSettings.QuartzQueueName)
这将使用 Quartz 配置的调度程序工厂(很明显,您将其配置为不在内存中。它还将总线观察器配置为 start/stop quartz 自动作为总线 starts/stops .