MassTransit - ScheduleRecurringSend 问题
MassTransit - Issue with ScheduleRecurringSend
我有一个服务,我想用它来为通过 REST api 的作业调度提供适配器。计划的作业是对各种其他服务进行 HTTP 调用。
我有以下实现:
[HttpPost]
public async Task<IActionResult> Post([FromBody] ScheduleModel schedule)
{
_logger.LogInformation(JsonConvert.SerializeObject(schedule));
var recurringSchedule = new AcmeRecurringSchedule
{
Description = schedule.Description,
CronExpression = schedule.CronExpression,
ScheduleId = schedule.JobIdentifier,
ScheduleGroup = schedule.Group
};
var schedulerUrl = new Uri($"{_massTransitSettings.Protocol}://{_massTransitSettings.RabbitMqHost}/scheduler");
var endpoint = await _busControl.GetSendEndpoint(schedulerUrl);
await endpoint.ScheduleRecurringSend(schedulerUrl, recurringSchedule, new ScheduledRestInvocationMessage()
{
Url = new Uri("http://wwww.google.com")
});
return Ok(schedule.Key);
}
但是我发现 ScheduleRecurringSend 方法并没有始终如一地将作业持久保存到 Db。它是间歇性的。我试过改变 ScheduleId 和 Group 但这会产生间歇性的结果。我在跳过的队列中收到很多消息。
我正在 Azure Service Fabric 中使用此服务的改装版本,以确保只有一项服务知道 Quartz DB 位置。
https://github.com/MassTransit/MassTransit-Quartz
这种做法有错吗?
谢谢
你用错了。
首先,您需要 configure 总线,以便它使用正确的调度程序地址:
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.UseMessageScheduler(
new Uri($"{_massTransitSettings.Protocol}://{_massTransitSettings.RabbitMqHost}/scheduler"));
});
然后,你需要正确使用ScheduleRecurringSend。第一个参数不是调度程序 Uri,而是将获取调度消息的服务端点的 Uri。必须在总线配置中指定调度程序 Uri。
我有一个服务,我想用它来为通过 REST api 的作业调度提供适配器。计划的作业是对各种其他服务进行 HTTP 调用。
我有以下实现:
[HttpPost]
public async Task<IActionResult> Post([FromBody] ScheduleModel schedule)
{
_logger.LogInformation(JsonConvert.SerializeObject(schedule));
var recurringSchedule = new AcmeRecurringSchedule
{
Description = schedule.Description,
CronExpression = schedule.CronExpression,
ScheduleId = schedule.JobIdentifier,
ScheduleGroup = schedule.Group
};
var schedulerUrl = new Uri($"{_massTransitSettings.Protocol}://{_massTransitSettings.RabbitMqHost}/scheduler");
var endpoint = await _busControl.GetSendEndpoint(schedulerUrl);
await endpoint.ScheduleRecurringSend(schedulerUrl, recurringSchedule, new ScheduledRestInvocationMessage()
{
Url = new Uri("http://wwww.google.com")
});
return Ok(schedule.Key);
}
但是我发现 ScheduleRecurringSend 方法并没有始终如一地将作业持久保存到 Db。它是间歇性的。我试过改变 ScheduleId 和 Group 但这会产生间歇性的结果。我在跳过的队列中收到很多消息。
我正在 Azure Service Fabric 中使用此服务的改装版本,以确保只有一项服务知道 Quartz DB 位置。
https://github.com/MassTransit/MassTransit-Quartz
这种做法有错吗?
谢谢
你用错了。
首先,您需要 configure 总线,以便它使用正确的调度程序地址:
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.UseMessageScheduler(
new Uri($"{_massTransitSettings.Protocol}://{_massTransitSettings.RabbitMqHost}/scheduler"));
});
然后,你需要正确使用ScheduleRecurringSend。第一个参数不是调度程序 Uri,而是将获取调度消息的服务端点的 Uri。必须在总线配置中指定调度程序 Uri。