MassTransit.QuartzIntegration.ScheduledMessageJob: 无法实例化没有空构造函数的类型(参数 'ScheduledMessageJob')'

MassTransit.QuartzIntegration.ScheduledMessageJob: Cannot instantiate type which has no empty constructor (Parameter 'ScheduledMessageJob')'

目前正在将一个项目移植到 .Net 5,我一直在为 MassTransit 和 Quartz 调度程序配置而苦恼。

非预定消息工作正常,但预定消息则不然。

我能够在 Quartz DB 中看到计划的作业,但是当自动触发作业时出现以下错误:

Quartz.SchedulerException: Problem instantiating class 'MassTransit.QuartzIntegration.ScheduledMessageJob: Cannot instantiate type which has no empty constructor (Parameter 'ScheduledMessageJob')'

本项目有

网站

在 startup.cs 我有:

        NameValueCollection properties = new();
        properties.Add("quartz.scheduler.instanceName", "QuartzSchedulerInstance1");
        properties.Add("quartz.scheduler.instanceId", "Quartz_instance_1");
        properties.Add("quartz.threadPool.type", "Quartz.Simpl.SimpleThreadPool, Quartz");
        properties.Add("quartz.threadPool.threadCount", "10");
        properties.Add("quartz.threadPool.threadPriority", "2");
        properties.Add("quartz.jobStore.misfireThreshold", "60000");
        properties.Add("quartz.jobStore.type", "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz");
        properties.Add("quartz.jobStore.useProperties", "true");
        properties.Add("quartz.jobStore.dataSource", "default");
        properties.Add("quartz.jobStore.tablePrefix", "QRTZ_");
        properties.Add("quartz.jobStore.lockHandler.type", "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz");
        properties.Add("quartz.dataSource.default.connectionString", "Server=(local);Database=Quartz;Trusted_Connection=True;");
        properties.Add("quartz.dataSource.default.provider", "SqlServer");
        properties.Add("quartz.serializer.type", "json");

        StdSchedulerFactory factory = new StdSchedulerFactory(properties);
        var scheduler = factory.GetScheduler().Result;
        services.AddSingleton(scheduler);

        services.AddMassTransit(x =>
        {
            x.AddMessageScheduler(busSettings.QuartzQueueAddress);

            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.Host(busSettings.RabbitMQAddress, host =>
                {
                    host.Username(busSettings.RabbitMQLogin);
                    host.Password(busSettings.RabbitMQPassword);
                });
                cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);
                cfg.ConfigureEndpoints(context);
            });
        });

        services.AddMassTransitHostedService();

一条非预定消息是这样发送的:

         var message = new BuildReminderDeploySetRequest(Guid.NewGuid(), ... other parameters...);
        var target = await _bus.GetSendEndpoint(_busSettings.ReminderRequestQueueAddress);
        await target.Send(message);

虽然预定的一个是这样发送的:

         var message = new BuildReminderDeploySetRequest(Guid.NewGuid(),  ... other parameters...);
        await _scheduler.ScheduleSend(_busSettings.ReminderRequestQueueAddress, reminderDate, message);

分别通过DI注入了以下内容: IBusControl _bus 用于非计划消息 IMessageScheduler _scheduler 用于计划消息。

Windows服务

在 Program.cs 我有:

        services.AddTransient<ProcessBuildReminderDeploySetConsumer>();

        NameValueCollection properties = new();
        properties.Add("quartz.scheduler.instanceName", "QuartzSchedulerInstance1");
        properties.Add("quartz.scheduler.instanceId", "Quartz_instance_1");
        properties.Add("quartz.threadPool.type", "Quartz.Simpl.SimpleThreadPool, Quartz");
        properties.Add("quartz.threadPool.threadCount", "10");
        properties.Add("quartz.threadPool.threadPriority", "2");
        properties.Add("quartz.jobStore.misfireThreshold", "60000");
        properties.Add("quartz.jobStore.type", "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz");
        properties.Add("quartz.jobStore.useProperties", "true");
        properties.Add("quartz.jobStore.dataSource", "default");
        properties.Add("quartz.jobStore.tablePrefix", "QRTZ_");
        properties.Add("quartz.jobStore.lockHandler.type", "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz");
        properties.Add("quartz.dataSource.default.connectionString", "Server=(local);Database=Quartz;Trusted_Connection=True;");
        properties.Add("quartz.dataSource.default.provider", "SqlServer");
        properties.Add("quartz.serializer.type", "json");

        StdSchedulerFactory factory = new StdSchedulerFactory(properties);
        var scheduler = factory.GetScheduler().Result;
        scheduler.Start();
        services.AddSingleton(scheduler);

        services.AddMassTransit(conf =>
        {
            conf.AddMessageScheduler(busSettings.QuartzQueueAddress);

            conf.UsingRabbitMq((context, cfg) =>
            {
                cfg.Host(busSettings.RabbitMQAddress, host =>
                {
                    host.Username(busSettings.RabbitMQLogin);
                    host.Password(busSettings.RabbitMQPassword);
                });
                cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);

                cfg.ReceiveEndpoint(busSettings.QuartzQueueName, epc =>
                {
                    var partitioner = cfg.CreatePartitioner(16);
                    epc.Consumer(() =>
                        new ScheduleMessageConsumer(scheduler),
                        x => x.Message<ScheduleMessage>(
                            m => m.UsePartitioner(
                                partitioner,
                                p => p.Message.CorrelationId)
                            )
                    );
                    epc.Consumer(
                        () => new CancelScheduledMessageConsumer(scheduler),
                        x => x.Message<CancelScheduledMessage>(
                            m => m.UsePartitioner(partitioner, p => p.Message.TokenId)
                        )
                    );
                    cfg.UseMessageScheduler(epc.InputAddress);
                    cfg.Durable = true;

                    // THIS USED TO BE IN THE OLD .NET 4.6 PROJECT
                    // BUT THOSE CLASSES DO NOT EXIST ANYMORE

                    //var specification =
                    //new SchedulerBusFactorySpecification(scheduler, epc.InputAddress);
                    //cfg.AddBusFactorySpecification(specification);
                });
                cfg.ReceiveEndpoint(busSettings.ReminderRequestQueueName, e =>
                {
                    // Enforce loading only 1 message at a time for now
                    e.PrefetchCount = 1;
                    e.UseMessageRetry(r =>
                    {
                        r.Interval(retryCount: 5, interval: TimeSpan.FromMilliseconds(500));
                    });
                    e.Consumer<ProcessBuildReminderDeploySetConsumer>(context);
                });

                cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);

                cfg.ConfigureEndpoints(context);
            });
        });

        services.AddMassTransitHostedService();

使用直接发送到队列的消息没有问题。 关于定时消息,由于没有被Quartz正确触发,所以无法消费。

我确定可能有一些配置问题导致了这个错误,但是在各种配置方式和版本之间的变化之间,我无法弄清楚这一点。

对于当前版本的 Quartz,有 extension methods for configuring the service collection。这些方法将 ISchedulerFactory 添加到容器中。

然后,在你想要托管quartz的过程中,你可以在MassTransit中使用配置中的非显而易见的扩展方法来配置它:

services.AddMassTransit(conf =>
{
    conf.AddMessageScheduler(busSettings.QuartzQueueAddress);

    conf.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(busSettings.RabbitMQAddress, host =>
        {
            host.Username(busSettings.RabbitMQLogin);
            host.Password(busSettings.RabbitMQPassword);
        });

        cfg.UseInMemoryScheduler(context, busSettings.QuartzQueueName)

这将使用 Quartz 配置的调度程序工厂(很明显,您将其配置为不在内存中。它还将总线观察器配置为 start/stop quartz 自动作为总线 starts/stops .