MassTransit/Quartz.NET 计划适用于 InMemory 但不适用于持久模式

MassTransit/Quartz.NET schedule works for InMemory but not in persistent mode

当将 InMemoryMessageScheduler 与 Quartz 一起使用时,ScheduledMessage 实际上得到了调度,并且“消息”在定义的时间发布。 问题是当使用带有持久化选项的 Quartz 时,消息被持久化到数据库中,ScheduledMessage 被 ScheduleMessageConsumer 使用但是“消息”永远不会在定义的时间发布。

我使用了 Sample-GettingStarted 并添加了下一个更改:

石英配置:

public class QuartzConfig : Dictionary<string, string>
{
    public QuartzConfig(string connectionString)
    {
        this["quartz.scheduler.instanceName"] = "MassTransit-Scheduler";
        this["quartz.scheduler.instanceId"] = "AUTO";
        this["quartz.serializer.type"] = "json";
        this["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz";
        this["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.StdAdoDelegate, Quartz";

        this["quartz.jobStore.tablePrefix"] = "QRTZ_";
        this["quartz.jobStore.dataSource"] = "myDS";
        this["quartz.dataSource.myDS.provider"] = "Npgsql";
        this["quartz.dataSource.myDS.connectionString"] = connectionString;
        this["quartz.jobStore.useProperties"] = "true";
    }

    public NameValueCollection ToNameValueCollection()
    {
        return this.Aggregate(new NameValueCollection(), (seed, current) =>
        {
            seed.Add(current.Key, current.Value);
            return seed;
        });
    }
}

配置:

public static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {
                services.AddMassTransit(x =>
                {
                    x.AddConsumer<MessageConsumer>();
                    var scheduler = CreateScheduler();

                    x.UsingRabbitMq((context,cfg) =>
                    {
                        cfg.ReceiveEndpoint("quartz", endpoint =>
                        {
                            endpoint.Consumer(() => new ScheduleMessageConsumer(scheduler));
                            endpoint.Consumer(() => new CancelScheduledMessageConsumer(scheduler));
                            cfg.UseMessageScheduler(endpoint.InputAddress);
                        });
                        cfg.ConfigureEndpoints(context);
                    });
                });
                services.AddMassTransitHostedService();

                services.AddHostedService<Worker>();
            });

    static IScheduler CreateScheduler()
    {
        var dbConnectionString = "Host=localhost;Database=scheduler;Port=5432;Password=pass;User ID=user;Pooling=true;MaxPoolSize=200;Enlist=true";
        var quartzConfig = new QuartzConfig(dbConnectionString)
            .ToNameValueCollection();

        ISchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzConfig);
        return schedulerFactory.GetScheduler().GetAwaiter().GetResult();
    }

Worker.cs:

    public class Worker : BackgroundService
{
    readonly IBus _bus;

    public Worker(IBus bus)
    {
        _bus = bus;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            await _bus.CreateMessageScheduler().SchedulePublish(DateTime.UtcNow + TimeSpan.FromSeconds(5), new Message { Text = $"I really hope this is scheduled {DateTime.Now}" });
            await Task.Delay(10000, stoppingToken);
        }
    }
}

有什么想法吗?

Quartz 集成包连接一个总线观察器来处理 Quartz.NET 的 start/stop,如 shown in the source。不幸的是,文档并没有很好地说明如何做到这一点。