MassTransit saga/statemachine 未将请求响应返回给控制器
MassTransit saga/statemachine not returning the request response to the controller
我觉得我快要完成这项工作了,但似乎无法完成...
我有一个 .NET Core ASP 应用程序和 saga/state 机器,它似乎在大多数情况下都运行良好。它:
- 收到请求
- 发布由路由名单拾取的事件
- 传送名单完成后,它会发布一个事件
- 事件拾起传奇
- 然后 saga 向 request/response 消费者发送请求
- 响应回到传奇
- 但是然后我使用 RespondAsync 尝试将响应发送回原始调用控制器,但是没有任何返回结果
我的控制器看起来像:
private readonly IRequestClient<IRequestLink, IRequestLinkCompleted> _client;
public async Task<IActionResult> CreateLinkAsync([FromBody]CreateLinkRequest request)
{
var requestLink = new RequestLink {
GroupName = $"{request.Name} Group",
Name = request.Name,
LinkId = Guid.NewGuid()
};
var result = await _client.Request(requestLink).ConfigureAwait(false);
return Ok();
}
我的故事的精简版如下:
Request(() => LinkRequest, x => x.RequestId, cfg =>
{
cfg.ServiceAddress = new Uri($"rabbitmq://localhost/request_end_point_name");
cfg.SchedulingServiceAddress = new Uri($"rabbitmq://localhost/request_end_point_name");
cfg.Timeout = TimeSpan.FromSeconds(30);
});
During(RequestReceived,
When(LinkCreatedEvent)
.Request(LinkRequest, context => new SelectUrlByPublicId(context.Data.DatabaseId, context.Data.LinkId))
.TransitionTo(LinkRequest.Pending));
During(LinkRequest.Pending,
When(LinkRequest.Completed)
.ThenAsync(context => context.RespondAsync(new RequestLinkCompleted
{
CorrelationId = LinkRequest.GetRequestId(context.Instance),
DatabaseId = context.Data.DatabaseId
}))
.Finalize());
最后,在我的启动代码中,我这样配置 request/response:
services.AddScoped<IRequestClient<IRequestLink, IRequestLinkCompleted>>(x => new MessageRequestClient<IRequestLink, IRequestLinkCompleted>(_bus, new Uri($"{messageBusSettings.Host}/create_link_saga"), TimeSpan.FromSeconds(30)));
我猜 RespondAsync 调用没有使用 right/original requestId,但我不知道如何检查或更改它。有人可以帮忙吗?
由于丢失了原始请求的上下文,您基本上需要自己完成 RespondAsync
中所做的事情。
- 从请求消息上下文
中保存ResponseAddress
- 从相同的上下文中保存
RequestId
在您的 saga 中,当需要响应时,您需要使用 context.GetSendEndpoint(context.Instance.SavedResponseAddress)
然后调用 Send
设置委托中的 RequestId
以匹配原始保存的 RequestId上下文。
现在,您可能需要将这些保存在路由选择变量中,因为您的 saga 没有获得命令,只有后续事件,但最终效果是一样的,原始请求消息已经消失并且从未被看到传奇。
对于那些想要更清楚地了解一些代码片段以使其正常工作的人,这是我们所做的工作,运行 传奇向初始请求者返回响应。我将上面的答案标记为正确的,但是我们仍然花了几个小时来让我们的实现工作,因为我们对 sagas 还很陌生,并且无法弄清楚从哪里访问原始上下文而不仅仅是我们的消息,因为我们实现传奇结构的方式。希望这能在将来为其他人节省一些时间。
初始 Saga 设置:
public PaymentStateMachine()
{
InstanceState(x => x.CurrentState);
this.ConfigureCorrelationIds();
.....
//Saga workflow registration
.....
this.During(ValidationSucceeded
, SetPaymentGatewaySubmissionHandler());
SetCompletedWhenFinalized();
}
我们必须注册初始事件,以便将初始消息中的一些信息保存到 saga table 中。需要注意的关键部分是 context.responseAddress 和 context.RequestId 被保存到传奇工厂中。
private void ConfigureCorrelationIds()
{
Event(() => PaymentSubmittedEvent,
x =>
{
x.CorrelateBy<int>(pay => pay.OrderId, context => context.Message.Order.Id)
.SelectId(c => c.Message.CorrelationId);
x.InsertOnInitial = true;
x.SetSagaFactory(context => new PaymentSagaState()
{
ResponseAddress = context.ResponseAddress.ToString(),
CorrelationId = context.CorrelationId.Value,
RequestId = context.RequestId,
OrderId = context.Message.Order.Id,
Created = DateTime.Now,
Updated = DateTime.Now,
CurrentState = Initial.Name
});
});
....
//Other events registered
}
从那里开始,我们使用上面标记的答案将响应发送回原始消息的响应地址,当我们完成我们的 saga 处理后。 new PaymentSubmittedResponse 只是客户端在前端等待的消息合约 saga 中的私有实现。
private EventActivityBinder<PaymentSagaState, IPaymentGatewaySubmittedEvent> SetPaymentGatewaySubmissionHandler() =>
When(PaymentGatewaySubmittedEvent)
.Then(c => this.UpdateSagaState(c, PaymentGatewayComplete.Name))
.Then(c => Console.Out.WriteLineAsync(
$"{DateTime.Now} PaymentGatewayComplete: {c.Data.Status} to {c.Instance.CorrelationId}"))
.ThenAsync(async c =>
{
//Send response back to orignial requestor once we are done with this step
var responseEndpoint =
await c.GetSendEndpoint(new Uri(c.Instance.ResponseAddress));
await responseEndpoint.Send(new PaymentSubmittedResponse(c.Data), callback: sendContext => sendContext.RequestId = c.Instance.RequestId);
})
.TransitionTo(ClientPaymentSubmissionResponded);
private void UpdateSagaState(BehaviorContext<PaymentSagaState> sagaContext, string status)
{
sagaContext.Instance.CurrentState = status;
sagaContext.Instance.Updated = DateTime.Now;
}
private class PaymentSubmittedResponse : IPaymentSubmittedEvent
{
public string Status { get; set; }
public OrderDto Order { get; set; }
public Guid CorrelationId { get; set; }
public DateTime TimeStamp { get; set; }
public PaymentSubmittedResponse(IPaymentBase paymentMessage)
{
Order = paymentMessage.Order;
CorrelationId = paymentMessage.CorrelationId;
Status = paymentMessage.Status;
TimeStamp = paymentMessage.TimeStamp;
}
}
不确定是否完全需要它,或者仅仅是因为我们如何实现它,但我们不得不引入另一种传奇状态 ClientPaymentSubmissionResponded 来处理响应消息事件发送回原始请求。
我觉得我快要完成这项工作了,但似乎无法完成...
我有一个 .NET Core ASP 应用程序和 saga/state 机器,它似乎在大多数情况下都运行良好。它:
- 收到请求
- 发布由路由名单拾取的事件
- 传送名单完成后,它会发布一个事件
- 事件拾起传奇
- 然后 saga 向 request/response 消费者发送请求
- 响应回到传奇
- 但是然后我使用 RespondAsync 尝试将响应发送回原始调用控制器,但是没有任何返回结果
我的控制器看起来像:
private readonly IRequestClient<IRequestLink, IRequestLinkCompleted> _client;
public async Task<IActionResult> CreateLinkAsync([FromBody]CreateLinkRequest request)
{
var requestLink = new RequestLink {
GroupName = $"{request.Name} Group",
Name = request.Name,
LinkId = Guid.NewGuid()
};
var result = await _client.Request(requestLink).ConfigureAwait(false);
return Ok();
}
我的故事的精简版如下:
Request(() => LinkRequest, x => x.RequestId, cfg =>
{
cfg.ServiceAddress = new Uri($"rabbitmq://localhost/request_end_point_name");
cfg.SchedulingServiceAddress = new Uri($"rabbitmq://localhost/request_end_point_name");
cfg.Timeout = TimeSpan.FromSeconds(30);
});
During(RequestReceived,
When(LinkCreatedEvent)
.Request(LinkRequest, context => new SelectUrlByPublicId(context.Data.DatabaseId, context.Data.LinkId))
.TransitionTo(LinkRequest.Pending));
During(LinkRequest.Pending,
When(LinkRequest.Completed)
.ThenAsync(context => context.RespondAsync(new RequestLinkCompleted
{
CorrelationId = LinkRequest.GetRequestId(context.Instance),
DatabaseId = context.Data.DatabaseId
}))
.Finalize());
最后,在我的启动代码中,我这样配置 request/response:
services.AddScoped<IRequestClient<IRequestLink, IRequestLinkCompleted>>(x => new MessageRequestClient<IRequestLink, IRequestLinkCompleted>(_bus, new Uri($"{messageBusSettings.Host}/create_link_saga"), TimeSpan.FromSeconds(30)));
我猜 RespondAsync 调用没有使用 right/original requestId,但我不知道如何检查或更改它。有人可以帮忙吗?
由于丢失了原始请求的上下文,您基本上需要自己完成 RespondAsync
中所做的事情。
- 从请求消息上下文 中保存
- 从相同的上下文中保存
RequestId
ResponseAddress
在您的 saga 中,当需要响应时,您需要使用 context.GetSendEndpoint(context.Instance.SavedResponseAddress)
然后调用 Send
设置委托中的 RequestId
以匹配原始保存的 RequestId上下文。
现在,您可能需要将这些保存在路由选择变量中,因为您的 saga 没有获得命令,只有后续事件,但最终效果是一样的,原始请求消息已经消失并且从未被看到传奇。
对于那些想要更清楚地了解一些代码片段以使其正常工作的人,这是我们所做的工作,运行 传奇向初始请求者返回响应。我将上面的答案标记为正确的,但是我们仍然花了几个小时来让我们的实现工作,因为我们对 sagas 还很陌生,并且无法弄清楚从哪里访问原始上下文而不仅仅是我们的消息,因为我们实现传奇结构的方式。希望这能在将来为其他人节省一些时间。
初始 Saga 设置:
public PaymentStateMachine()
{
InstanceState(x => x.CurrentState);
this.ConfigureCorrelationIds();
.....
//Saga workflow registration
.....
this.During(ValidationSucceeded
, SetPaymentGatewaySubmissionHandler());
SetCompletedWhenFinalized();
}
我们必须注册初始事件,以便将初始消息中的一些信息保存到 saga table 中。需要注意的关键部分是 context.responseAddress 和 context.RequestId 被保存到传奇工厂中。
private void ConfigureCorrelationIds()
{
Event(() => PaymentSubmittedEvent,
x =>
{
x.CorrelateBy<int>(pay => pay.OrderId, context => context.Message.Order.Id)
.SelectId(c => c.Message.CorrelationId);
x.InsertOnInitial = true;
x.SetSagaFactory(context => new PaymentSagaState()
{
ResponseAddress = context.ResponseAddress.ToString(),
CorrelationId = context.CorrelationId.Value,
RequestId = context.RequestId,
OrderId = context.Message.Order.Id,
Created = DateTime.Now,
Updated = DateTime.Now,
CurrentState = Initial.Name
});
});
....
//Other events registered
}
从那里开始,我们使用上面标记的答案将响应发送回原始消息的响应地址,当我们完成我们的 saga 处理后。 new PaymentSubmittedResponse 只是客户端在前端等待的消息合约 saga 中的私有实现。
private EventActivityBinder<PaymentSagaState, IPaymentGatewaySubmittedEvent> SetPaymentGatewaySubmissionHandler() =>
When(PaymentGatewaySubmittedEvent)
.Then(c => this.UpdateSagaState(c, PaymentGatewayComplete.Name))
.Then(c => Console.Out.WriteLineAsync(
$"{DateTime.Now} PaymentGatewayComplete: {c.Data.Status} to {c.Instance.CorrelationId}"))
.ThenAsync(async c =>
{
//Send response back to orignial requestor once we are done with this step
var responseEndpoint =
await c.GetSendEndpoint(new Uri(c.Instance.ResponseAddress));
await responseEndpoint.Send(new PaymentSubmittedResponse(c.Data), callback: sendContext => sendContext.RequestId = c.Instance.RequestId);
})
.TransitionTo(ClientPaymentSubmissionResponded);
private void UpdateSagaState(BehaviorContext<PaymentSagaState> sagaContext, string status)
{
sagaContext.Instance.CurrentState = status;
sagaContext.Instance.Updated = DateTime.Now;
}
private class PaymentSubmittedResponse : IPaymentSubmittedEvent
{
public string Status { get; set; }
public OrderDto Order { get; set; }
public Guid CorrelationId { get; set; }
public DateTime TimeStamp { get; set; }
public PaymentSubmittedResponse(IPaymentBase paymentMessage)
{
Order = paymentMessage.Order;
CorrelationId = paymentMessage.CorrelationId;
Status = paymentMessage.Status;
TimeStamp = paymentMessage.TimeStamp;
}
}
不确定是否完全需要它,或者仅仅是因为我们如何实现它,但我们不得不引入另一种传奇状态 ClientPaymentSubmissionResponded 来处理响应消息事件发送回原始请求。