MassTransit/Quartz.NET 计划适用于 InMemory 但不适用于持久模式
MassTransit/Quartz.NET schedule works for InMemory but not in persistent mode
当将 InMemoryMessageScheduler 与 Quartz 一起使用时,ScheduledMessage 实际上得到了调度,并且“消息”在定义的时间发布。
问题是当使用带有持久化选项的 Quartz 时,消息被持久化到数据库中,ScheduledMessage 被 ScheduleMessageConsumer 使用但是“消息”永远不会在定义的时间发布。
我使用了 Sample-GettingStarted 并添加了下一个更改:
石英配置:
public class QuartzConfig : Dictionary<string, string>
{
public QuartzConfig(string connectionString)
{
this["quartz.scheduler.instanceName"] = "MassTransit-Scheduler";
this["quartz.scheduler.instanceId"] = "AUTO";
this["quartz.serializer.type"] = "json";
this["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz";
this["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.StdAdoDelegate, Quartz";
this["quartz.jobStore.tablePrefix"] = "QRTZ_";
this["quartz.jobStore.dataSource"] = "myDS";
this["quartz.dataSource.myDS.provider"] = "Npgsql";
this["quartz.dataSource.myDS.connectionString"] = connectionString;
this["quartz.jobStore.useProperties"] = "true";
}
public NameValueCollection ToNameValueCollection()
{
return this.Aggregate(new NameValueCollection(), (seed, current) =>
{
seed.Add(current.Key, current.Value);
return seed;
});
}
}
配置:
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(x =>
{
x.AddConsumer<MessageConsumer>();
var scheduler = CreateScheduler();
x.UsingRabbitMq((context,cfg) =>
{
cfg.ReceiveEndpoint("quartz", endpoint =>
{
endpoint.Consumer(() => new ScheduleMessageConsumer(scheduler));
endpoint.Consumer(() => new CancelScheduledMessageConsumer(scheduler));
cfg.UseMessageScheduler(endpoint.InputAddress);
});
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
services.AddHostedService<Worker>();
});
static IScheduler CreateScheduler()
{
var dbConnectionString = "Host=localhost;Database=scheduler;Port=5432;Password=pass;User ID=user;Pooling=true;MaxPoolSize=200;Enlist=true";
var quartzConfig = new QuartzConfig(dbConnectionString)
.ToNameValueCollection();
ISchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzConfig);
return schedulerFactory.GetScheduler().GetAwaiter().GetResult();
}
Worker.cs:
public class Worker : BackgroundService
{
readonly IBus _bus;
public Worker(IBus bus)
{
_bus = bus;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await _bus.CreateMessageScheduler().SchedulePublish(DateTime.UtcNow + TimeSpan.FromSeconds(5), new Message { Text = $"I really hope this is scheduled {DateTime.Now}" });
await Task.Delay(10000, stoppingToken);
}
}
}
有什么想法吗?
Quartz 集成包连接一个总线观察器来处理 Quartz.NET 的 start/stop,如 shown in the source。不幸的是,文档并没有很好地说明如何做到这一点。
当将 InMemoryMessageScheduler 与 Quartz 一起使用时,ScheduledMessage 实际上得到了调度,并且“消息”在定义的时间发布。 问题是当使用带有持久化选项的 Quartz 时,消息被持久化到数据库中,ScheduledMessage 被 ScheduleMessageConsumer 使用但是“消息”永远不会在定义的时间发布。
我使用了 Sample-GettingStarted 并添加了下一个更改:
石英配置:
public class QuartzConfig : Dictionary<string, string>
{
public QuartzConfig(string connectionString)
{
this["quartz.scheduler.instanceName"] = "MassTransit-Scheduler";
this["quartz.scheduler.instanceId"] = "AUTO";
this["quartz.serializer.type"] = "json";
this["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz";
this["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.StdAdoDelegate, Quartz";
this["quartz.jobStore.tablePrefix"] = "QRTZ_";
this["quartz.jobStore.dataSource"] = "myDS";
this["quartz.dataSource.myDS.provider"] = "Npgsql";
this["quartz.dataSource.myDS.connectionString"] = connectionString;
this["quartz.jobStore.useProperties"] = "true";
}
public NameValueCollection ToNameValueCollection()
{
return this.Aggregate(new NameValueCollection(), (seed, current) =>
{
seed.Add(current.Key, current.Value);
return seed;
});
}
}
配置:
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(x =>
{
x.AddConsumer<MessageConsumer>();
var scheduler = CreateScheduler();
x.UsingRabbitMq((context,cfg) =>
{
cfg.ReceiveEndpoint("quartz", endpoint =>
{
endpoint.Consumer(() => new ScheduleMessageConsumer(scheduler));
endpoint.Consumer(() => new CancelScheduledMessageConsumer(scheduler));
cfg.UseMessageScheduler(endpoint.InputAddress);
});
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
services.AddHostedService<Worker>();
});
static IScheduler CreateScheduler()
{
var dbConnectionString = "Host=localhost;Database=scheduler;Port=5432;Password=pass;User ID=user;Pooling=true;MaxPoolSize=200;Enlist=true";
var quartzConfig = new QuartzConfig(dbConnectionString)
.ToNameValueCollection();
ISchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzConfig);
return schedulerFactory.GetScheduler().GetAwaiter().GetResult();
}
Worker.cs:
public class Worker : BackgroundService
{
readonly IBus _bus;
public Worker(IBus bus)
{
_bus = bus;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await _bus.CreateMessageScheduler().SchedulePublish(DateTime.UtcNow + TimeSpan.FromSeconds(5), new Message { Text = $"I really hope this is scheduled {DateTime.Now}" });
await Task.Delay(10000, stoppingToken);
}
}
}
有什么想法吗?
Quartz 集成包连接一个总线观察器来处理 Quartz.NET 的 start/stop,如 shown in the source。不幸的是,文档并没有很好地说明如何做到这一点。