来自 saga 的成功 request/response 在 saga 跳过队列中留下已取消的消息

Successful request/response from saga leaves Canceled message in saga skipped queue

经过大量的努力和反复试验,我设法从我的 saga 发出了一个“请求”,并看到它处理了响应。然而,由于在我所在州的跳过队列中出现一条消息,我的喜悦被打断了。 (我正在使用 azure 服务总线)

类型为“urn:message:MassTransit.Scheduling:CancelScheduledMessage”。

我是公共交通方面的新手,我只是想举一个人为的例子。

我的传奇故事叫 TaxiToRunway/TaxiingComplete。我的传奇代码

Request(()=>TaxiToRunway, config =>
        {
            config.Timeout = TimeSpan.FromSeconds(30);
        });
...
public Request<PlaneState, TaxiToRunway, TaxiingComplete> TaxiToRunway { get; private set; }
...
Initially(
            When(ReadyToDepart)
                .Then(context =>
                {
                    context.Saga.Altitude = 0;
                    context.Saga.Speed = 0;
                    context.Saga.FlightNo = context.Message.FlightNo;
                    context.Saga.CorrelationId = context.Message.CorrelationId;
                    Console.WriteLine($"Flight {context.Message.FlightNo} is ready to depart.");
                })
                .TransitionTo(Taxiing)
                .Request(TaxiToRunway,
                    (context) => context.Init<TaxiToRunway>(new {CorrelationId = context.Saga.CorrelationId}))
...
During(Taxiing, 
            Ignore(ReadyToDepart),
            
                When(TaxiToRunway.Completed)
                    .Then(x =>
                    {
                        x.ToString();
                    })
                    .TransitionTo(TakingOff),

在附加调试器的情况下,我点击了 x.ToString() 行。

消费者(在不同的主机):

public class TaxiToRunwayConsumer: IConsumer<TaxiToRunway>
{
    public async Task Consume(ConsumeContext<TaxiToRunway> context)
    {
        await context.RespondAsync<TaxiingComplete>(new
        {
            context.Message.CorrelationId
        });
    }
}

Saga 启动配置:

cfg.AddSagaStateMachine<PlaneStateMachine, PlaneState>()
                .MessageSessionRepository();
            
            cfg.AddServiceBusMessageScheduler();

            cfg.UsingAzureServiceBus((context, sbCfg) =>
            {
                var connectionString = appConfig.ServiceBus.ConnectionString;
                sbCfg.Host(connectionString);
                
                EndpointConvention.Map<TaxiToRunway>(new Uri("sb://xxx.servicebus.windows.net/taxi-to-runway"));
                
                sbCfg.UseServiceBusMessageScheduler();
                
                sbCfg.ReceiveEndpoint("plane-state", e =>
                {
                    e.UseInMemoryOutbox();
                    e.RequiresSession = true;
                    e.PrefetchCount = 50;
                    e.MaxConcurrentCalls = 50;
                    e.ConfigureSaga<PlaneState>(context);
                });
                
                sbCfg.ConfigureEndpoints(context);
            });

我可以在日志输出中看到:

dbug: MassTransit.Messages[0]
      SEND sb://dbpdf-us-dev-sam.servicebus.windows.net/plane-state 80d90000-5d7b-2cf0-7a6b-08da0fd3e7b7 MassTransit.Scheduling.CancelScheduledMessage

我应该把这当作一个事件来处理吗?

这方面的学习曲线肯定很陡峭!我的问题是我需要做什么才能不让这些消息被跳过?

所以,这不起作用的原因:

  1. 消息会话 saga 存储库只能通过 SessionId 关联,因为它是 session-stored 数据。
  2. 因此,requestId 必须等于 saga 实例 correlationId(也称为 SessionId)
  3. 请求发送的超时消息,根据预定消息的序号获取tokenId
  4. 没有保存在任何地方
  5. 所以请求超时没有被取消

在这种情况下,正确的方法是使用没有超时的 Request/Response 并使用单独的时间表自行安排超时。