MassTransit saga/statemachine 未将请求响应返回给控制器

MassTransit saga/statemachine not returning the request response to the controller

我觉得我快要完成这项工作了,但似乎无法完成...

我有一个 .NET Core ASP 应用程序和 saga/state 机器,它似乎在大多数情况下都运行良好。它:

  1. 收到请求
  2. 发布由路由名单拾取的事件
  3. 传送名单完成后,它会发布一个事件
  4. 事件拾起传奇
  5. 然后 saga 向 request/response 消费者发送请求
  6. 响应回到传奇
  7. 但是然后我使用 RespondAsync 尝试将响应发送回原始调用控制器,但是没有任何返回结果

我的控制器看起来像:

private readonly IRequestClient<IRequestLink, IRequestLinkCompleted> _client;

public async Task<IActionResult> CreateLinkAsync([FromBody]CreateLinkRequest request)
{
        var requestLink = new RequestLink            {

            GroupName = $"{request.Name} Group",
            Name = request.Name,
            LinkId = Guid.NewGuid()
        };

        var result = await _client.Request(requestLink).ConfigureAwait(false);

        return Ok();
}

我的故事的精简版如下:

Request(() => LinkRequest, x => x.RequestId, cfg =>
        {
            cfg.ServiceAddress = new Uri($"rabbitmq://localhost/request_end_point_name");
            cfg.SchedulingServiceAddress = new Uri($"rabbitmq://localhost/request_end_point_name");
            cfg.Timeout = TimeSpan.FromSeconds(30);
        });


During(RequestReceived,
                When(LinkCreatedEvent)
                    .Request(LinkRequest, context => new SelectUrlByPublicId(context.Data.DatabaseId, context.Data.LinkId))
                    .TransitionTo(LinkRequest.Pending));

            During(LinkRequest.Pending,                
                When(LinkRequest.Completed)
                    .ThenAsync(context => context.RespondAsync(new RequestLinkCompleted
                    {
                        CorrelationId = LinkRequest.GetRequestId(context.Instance),
                        DatabaseId = context.Data.DatabaseId
                    }))
                    .Finalize());

最后,在我的启动代码中,我这样配置 request/response:

services.AddScoped<IRequestClient<IRequestLink, IRequestLinkCompleted>>(x => new MessageRequestClient<IRequestLink, IRequestLinkCompleted>(_bus, new Uri($"{messageBusSettings.Host}/create_link_saga"), TimeSpan.FromSeconds(30)));

我猜 RespondAsync 调用没有使用 right/original requestId,但我不知道如何检查或更改它。有人可以帮忙吗?

由于丢失了原始请求的上下文,您基本上需要自己完成 RespondAsync 中所做的事情。

  1. 从请求消息上下文
  2. 中保存ResponseAddress
  3. 从相同的上下文中保存 RequestId

在您的 saga 中,当需要响应时,您需要使用 context.GetSendEndpoint(context.Instance.SavedResponseAddress) 然后调用 Send 设置委托中的 RequestId 以匹配原始保存的 RequestId上下文。

现在,您可能需要将这些保存在路由选择变量中,因为您的 saga 没有获得命令,只有后续事件,但最终效果是一样的,原始请求消息已经消失并且从未被看到传奇。

对于那些想要更清楚地了解一些代码片段以使其正常工作的人,这是我们所做的工作,运行 传奇向初始请求者返回响应。我将上面的答案标记为正确的,但是我们仍然花了几个小时来让我们的实现工作,因为我们对 sagas 还很陌生,并且无法弄清楚从哪里访问原始上下文而不仅仅是我们的消息,因为我们实现传奇结构的方式。希望这能在将来为其他人节省一些时间。

初始 Saga 设置:

 public PaymentStateMachine()
        {
            InstanceState(x => x.CurrentState);

            this.ConfigureCorrelationIds();
.....
//Saga workflow registration
.....

            this.During(ValidationSucceeded
                , SetPaymentGatewaySubmissionHandler());            

            SetCompletedWhenFinalized();
        }

我们必须注册初始事件,以便将初始消息中的一些信息保存到 saga table 中。需要注意的关键部分是 context.responseAddresscontext.RequestId 被保存到传奇工厂中。

private void ConfigureCorrelationIds()
{
Event(() => PaymentSubmittedEvent,
                x =>
                {

                    x.CorrelateBy<int>(pay => pay.OrderId, context => context.Message.Order.Id)
                    .SelectId(c => c.Message.CorrelationId);

                    x.InsertOnInitial = true;

                    x.SetSagaFactory(context => new PaymentSagaState()
                    {
                        ResponseAddress = context.ResponseAddress.ToString(),
                        CorrelationId = context.CorrelationId.Value,
                        RequestId = context.RequestId,
                        OrderId = context.Message.Order.Id,
                        Created = DateTime.Now,
                        Updated = DateTime.Now,
                        CurrentState = Initial.Name
                    });
                });
....
//Other events registered
}

从那里开始,我们使用上面标记的答案将响应发送回原始消息的响应地址,当我们完成我们的 saga 处理后。 new PaymentSubmittedResponse 只是客户端在前端等待的消息合约 saga 中的私有实现。

private EventActivityBinder<PaymentSagaState, IPaymentGatewaySubmittedEvent> SetPaymentGatewaySubmissionHandler() =>
            When(PaymentGatewaySubmittedEvent)
                .Then(c => this.UpdateSagaState(c, PaymentGatewayComplete.Name))
                .Then(c => Console.Out.WriteLineAsync(
                    $"{DateTime.Now} PaymentGatewayComplete: {c.Data.Status} to {c.Instance.CorrelationId}"))
                .ThenAsync(async c =>
                {              
                    //Send response back to orignial requestor once we are done with this step               
                    var responseEndpoint =
                            await c.GetSendEndpoint(new Uri(c.Instance.ResponseAddress));

                    await responseEndpoint.Send(new PaymentSubmittedResponse(c.Data), callback: sendContext => sendContext.RequestId = c.Instance.RequestId);
                })
            .TransitionTo(ClientPaymentSubmissionResponded);

private void UpdateSagaState(BehaviorContext<PaymentSagaState> sagaContext, string status)
        {
            sagaContext.Instance.CurrentState = status;
            sagaContext.Instance.Updated = DateTime.Now;
        }

private class PaymentSubmittedResponse : IPaymentSubmittedEvent
        {
            public string Status { get; set; }

            public OrderDto Order { get; set; }

            public Guid CorrelationId { get; set; }
            public DateTime TimeStamp { get; set; }

            public PaymentSubmittedResponse(IPaymentBase paymentMessage)
            {
                Order = paymentMessage.Order;
                CorrelationId = paymentMessage.CorrelationId;
                Status = paymentMessage.Status;
                TimeStamp = paymentMessage.TimeStamp;
            }
        }

不确定是否完全需要它,或者仅仅是因为我们如何实现它,但我们不得不引入另一种传奇状态 ClientPaymentSubmissionResponded 来处理响应消息事件发送回原始请求。