来自 saga 的成功 request/response 在 saga 跳过队列中留下已取消的消息
Successful request/response from saga leaves Canceled message in saga skipped queue
经过大量的努力和反复试验,我设法从我的 saga 发出了一个“请求”,并看到它处理了响应。然而,由于在我所在州的跳过队列中出现一条消息,我的喜悦被打断了。 (我正在使用 azure 服务总线)
类型为“urn:message:MassTransit.Scheduling:CancelScheduledMessage”。
我是公共交通方面的新手,我只是想举一个人为的例子。
我的传奇故事叫 TaxiToRunway/TaxiingComplete。我的传奇代码
Request(()=>TaxiToRunway, config =>
{
config.Timeout = TimeSpan.FromSeconds(30);
});
...
public Request<PlaneState, TaxiToRunway, TaxiingComplete> TaxiToRunway { get; private set; }
...
Initially(
When(ReadyToDepart)
.Then(context =>
{
context.Saga.Altitude = 0;
context.Saga.Speed = 0;
context.Saga.FlightNo = context.Message.FlightNo;
context.Saga.CorrelationId = context.Message.CorrelationId;
Console.WriteLine($"Flight {context.Message.FlightNo} is ready to depart.");
})
.TransitionTo(Taxiing)
.Request(TaxiToRunway,
(context) => context.Init<TaxiToRunway>(new {CorrelationId = context.Saga.CorrelationId}))
...
During(Taxiing,
Ignore(ReadyToDepart),
When(TaxiToRunway.Completed)
.Then(x =>
{
x.ToString();
})
.TransitionTo(TakingOff),
在附加调试器的情况下,我点击了 x.ToString()
行。
消费者(在不同的主机):
public class TaxiToRunwayConsumer: IConsumer<TaxiToRunway>
{
public async Task Consume(ConsumeContext<TaxiToRunway> context)
{
await context.RespondAsync<TaxiingComplete>(new
{
context.Message.CorrelationId
});
}
}
Saga 启动配置:
cfg.AddSagaStateMachine<PlaneStateMachine, PlaneState>()
.MessageSessionRepository();
cfg.AddServiceBusMessageScheduler();
cfg.UsingAzureServiceBus((context, sbCfg) =>
{
var connectionString = appConfig.ServiceBus.ConnectionString;
sbCfg.Host(connectionString);
EndpointConvention.Map<TaxiToRunway>(new Uri("sb://xxx.servicebus.windows.net/taxi-to-runway"));
sbCfg.UseServiceBusMessageScheduler();
sbCfg.ReceiveEndpoint("plane-state", e =>
{
e.UseInMemoryOutbox();
e.RequiresSession = true;
e.PrefetchCount = 50;
e.MaxConcurrentCalls = 50;
e.ConfigureSaga<PlaneState>(context);
});
sbCfg.ConfigureEndpoints(context);
});
我可以在日志输出中看到:
dbug: MassTransit.Messages[0]
SEND sb://dbpdf-us-dev-sam.servicebus.windows.net/plane-state 80d90000-5d7b-2cf0-7a6b-08da0fd3e7b7 MassTransit.Scheduling.CancelScheduledMessage
我应该把这当作一个事件来处理吗?
这方面的学习曲线肯定很陡峭!我的问题是我需要做什么才能不让这些消息被跳过?
所以,这不起作用的原因:
- 消息会话 saga 存储库只能通过 SessionId 关联,因为它是 session-stored 数据。
- 因此,requestId 必须等于 saga 实例 correlationId(也称为 SessionId)
- 请求发送的超时消息,根据预定消息的序号获取tokenId
- 没有保存在任何地方
- 所以请求超时没有被取消
在这种情况下,正确的方法是使用没有超时的 Request/Response 并使用单独的时间表自行安排超时。
经过大量的努力和反复试验,我设法从我的 saga 发出了一个“请求”,并看到它处理了响应。然而,由于在我所在州的跳过队列中出现一条消息,我的喜悦被打断了。 (我正在使用 azure 服务总线)
类型为“urn:message:MassTransit.Scheduling:CancelScheduledMessage”。
我是公共交通方面的新手,我只是想举一个人为的例子。
我的传奇故事叫 TaxiToRunway/TaxiingComplete。我的传奇代码
Request(()=>TaxiToRunway, config =>
{
config.Timeout = TimeSpan.FromSeconds(30);
});
...
public Request<PlaneState, TaxiToRunway, TaxiingComplete> TaxiToRunway { get; private set; }
...
Initially(
When(ReadyToDepart)
.Then(context =>
{
context.Saga.Altitude = 0;
context.Saga.Speed = 0;
context.Saga.FlightNo = context.Message.FlightNo;
context.Saga.CorrelationId = context.Message.CorrelationId;
Console.WriteLine($"Flight {context.Message.FlightNo} is ready to depart.");
})
.TransitionTo(Taxiing)
.Request(TaxiToRunway,
(context) => context.Init<TaxiToRunway>(new {CorrelationId = context.Saga.CorrelationId}))
...
During(Taxiing,
Ignore(ReadyToDepart),
When(TaxiToRunway.Completed)
.Then(x =>
{
x.ToString();
})
.TransitionTo(TakingOff),
在附加调试器的情况下,我点击了 x.ToString()
行。
消费者(在不同的主机):
public class TaxiToRunwayConsumer: IConsumer<TaxiToRunway>
{
public async Task Consume(ConsumeContext<TaxiToRunway> context)
{
await context.RespondAsync<TaxiingComplete>(new
{
context.Message.CorrelationId
});
}
}
Saga 启动配置:
cfg.AddSagaStateMachine<PlaneStateMachine, PlaneState>()
.MessageSessionRepository();
cfg.AddServiceBusMessageScheduler();
cfg.UsingAzureServiceBus((context, sbCfg) =>
{
var connectionString = appConfig.ServiceBus.ConnectionString;
sbCfg.Host(connectionString);
EndpointConvention.Map<TaxiToRunway>(new Uri("sb://xxx.servicebus.windows.net/taxi-to-runway"));
sbCfg.UseServiceBusMessageScheduler();
sbCfg.ReceiveEndpoint("plane-state", e =>
{
e.UseInMemoryOutbox();
e.RequiresSession = true;
e.PrefetchCount = 50;
e.MaxConcurrentCalls = 50;
e.ConfigureSaga<PlaneState>(context);
});
sbCfg.ConfigureEndpoints(context);
});
我可以在日志输出中看到:
dbug: MassTransit.Messages[0]
SEND sb://dbpdf-us-dev-sam.servicebus.windows.net/plane-state 80d90000-5d7b-2cf0-7a6b-08da0fd3e7b7 MassTransit.Scheduling.CancelScheduledMessage
我应该把这当作一个事件来处理吗?
这方面的学习曲线肯定很陡峭!我的问题是我需要做什么才能不让这些消息被跳过?
所以,这不起作用的原因:
- 消息会话 saga 存储库只能通过 SessionId 关联,因为它是 session-stored 数据。
- 因此,requestId 必须等于 saga 实例 correlationId(也称为 SessionId)
- 请求发送的超时消息,根据预定消息的序号获取tokenId
- 没有保存在任何地方
- 所以请求超时没有被取消
在这种情况下,正确的方法是使用没有超时的 Request/Response 并使用单独的时间表自行安排超时。