MassTransit - ScheduleRecurringSend 问题

MassTransit - Issue with ScheduleRecurringSend

我有一个服务,我想用它来为通过 REST api 的作业调度提供适配器。计划的作业是对各种其他服务进行 HTTP 调用。

我有以下实现:

[HttpPost]
            public async Task<IActionResult> Post([FromBody] ScheduleModel schedule)
            {
                _logger.LogInformation(JsonConvert.SerializeObject(schedule));
     
                var recurringSchedule = new AcmeRecurringSchedule
                {
                    Description = schedule.Description,
                    CronExpression = schedule.CronExpression,
                    ScheduleId = schedule.JobIdentifier,
                    ScheduleGroup = schedule.Group
                };
     
                var schedulerUrl = new Uri($"{_massTransitSettings.Protocol}://{_massTransitSettings.RabbitMqHost}/scheduler");
     
                var endpoint = await _busControl.GetSendEndpoint(schedulerUrl);
     
                await endpoint.ScheduleRecurringSend(schedulerUrl, recurringSchedule, new ScheduledRestInvocationMessage()
            {
                Url = new Uri("http://wwww.google.com")
            });
 
            return Ok(schedule.Key);
        }

但是我发现 ScheduleRecurringSend 方法并没有始终如一地将作业持久保存到 Db。它是间歇性的。我试过改变 ScheduleId 和 Group 但这会产生间歇性的结果。我在跳过的队列中收到很多消息。

我正在 Azure Service Fabric 中使用此服务的改装版本,以确保只有一项服务知道 Quartz DB 位置。

https://github.com/MassTransit/MassTransit-Quartz

这种做法有错吗?

谢谢

你用错了。

首先,您需要 configure 总线,以便它使用正确的调度程序地址:

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
    {
        h.Username("guest");
        h.Password("guest");
    });

    cfg.UseMessageScheduler(
        new Uri($"{_massTransitSettings.Protocol}://{_massTransitSettings.RabbitMqHost}/scheduler"));
});

然后,你需要正确使用ScheduleRecurringSend。第一个参数不是调度程序 Uri,而是将获取调度消息的服务端点的 Uri。必须在总线配置中指定调度程序 Uri。