MassTransit Saga - return 当前状态与 REST API 响应

MassTransit Saga - return Current State with REST API response

我正在使用带有 RabbitMQ 传输和 EntityFramework 存储库的 MassTransit Saga 状态机。

我的问题如下 - 当用户点击我的 REST API(.NET 6 Minimal API)中的端点时,作为我业务逻辑的一部分,我将一个事件发布到状态机,改变状态。所以,我想 return 当前(新)状态到 end-user 响应,这是不可能的(或者我找不到解决方案)。 请注意,我的状态机实现在同一个 REST API 项目中。

所以,我有: 最初,我找不到任何可以从存储中获取当前状态的东西(在我的例子中是 SQL 服务器数据库)。所以我创建了一个 EF 方法来执行此操作:

public class StateRepository<StateInstance> : IStateRepository<StateInstance> where StateInstance : SqlServerStateInstance
{
    protected readonly DbContext _stateDBContext;

    public StateRepository(DbContext stateDBContext)
    {
        _stateDBContext = stateDBContext;
    }

    public async Task<StateInstance> GetCurrentStateAsync (Guid correlationId)
    {
        return await _stateDBContext.Set<StateInstance>().Where(i => i.CorrelationId == correlationId).FirstOrDefaultAsync();
    }
}

哪个工作正常...如果我已经设置了状态。 :)

所以,我添加了一个中间件,它调用存储库通过ID获取状态并将其设置在响应的header:

internal class StateMiddleware
{
    private readonly RequestDelegate _next;

    public StateMiddleware(RequestDelegate next, ILogger<StateMiddleware> logger)
    {
        _next = next;
        _logger = logger;
    }
    
    public async Task Invoke(HttpContext context, IStateHandler stateHandler)
    {
        await _next(context);
        
        string state = await stateHandler.GetCurrentState(correlationID);
        context.Response.Headers.Add("X-State", state ?? "NULL?!?");
    }
}

已注册WebApplication.UseMiddleware<StateMiddleware>();

但是当我发布一个事件(从任何端点)来更新状态时,比如:

app.MapPost(RoutePrefix + "/sm/call-event/{userId}", async (IPublishEndpoint _publishEndpoint, Guid userId, StateEvents evn) =>
        {
            switch (evn)
            {
                case StateEvents.Initiate:
                    {
                        await _publishEndpoint.Publish<StateInstance>(new
                        {
                            CorrelationId = userId
                        }, default);
                        break;
                    }
                case StateEvents.CollectEmail:
                    {
                        await _publishEndpoint.Publish<CollectEmailData>(new
                        {
                            ActiveAccountExists = false,
                            CorrelationId = userId
                        }, default);
                        break;
                    }
                case StateEvents.ConfirmPass:
                    {
                        await _publishEndpoint.Publish<ConfirmPass>(new
                        {
                            CorrelationId = userId
                        }, default);
                        break;
                    }
            }
            return Results.Ok();
        })

状态为 NULL(如果我使用 StateEvents.Initiate 调用端点)或先前的值(使用枚举中的任何其他值)。

这是我的公共交通配置:

services.AddMassTransit(cfg=> {
            cfg.AddSagaStateMachine<MyStateMachine, MyStateInstance>()
                .EntityFrameworkRepository(r =>
                {
                    r.ConcurrencyMode = ConcurrencyMode.Optimistic; //requires RowVersion!

                    r.AddDbContext<DbContext, StateRepository>((provider, builder) =>
                    {
                        builder.UseSqlServer(connectionString, m =>
                        {
                            m.MigrationsAssembly(typeof(StateRepository).Assembly.GetName().Name);
                            m.MigrationsHistoryTable($"__{typeof(StateRepository).Name}");
                        });
                    });
                });
            cfg.UsingRabbitMq((context, x) =>
            {
                x.ConfigureEndpoints(context);
                x.Host("localhost",
                    h =>
                    {
                        h.Username("guest");
                        h.Password("guest");
                    }
                );
            });
        });
        services.AddMassTransitHostedService();

我知道 State Machine 是 event-based,它是异步的,但是有什么解决方法吗?

谢谢!

您应该永远不要直接访问那个 saga 存储库以获取此类信息。实例的当前状态和任何其他数据都属于 saga。尝试 read/access 状态,或者在最坏的情况下修改状态,都是被误导的,并且可能会引入各种不一致。

MassTransit 有一个简单的工具来访问 saga 的状态。您可以定义一个请求契约,由 saga 处理,允许 saga 本身以当前 state/details 响应。该请求甚至可以指定为只读,这样它就不会修改状态(或尝试保存未修改的状态)。

下面显示的示例具有响应请求的状态检查:

class ReadOnlyStateMachine :
    MassTransitStateMachine<ReadOnlyInstance>
{
    public ReadOnlyStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Event(() => StatusCheckRequested, x =>
        {
            x.ReadOnly = true;
        });

        Initially(
            When(Started)
                .Then(context => context.Instance.StatusText = "Started")
                .Respond(context => new StartupComplete {CorrelationId = context.Instance.CorrelationId})
                .TransitionTo(Running)
        );

        During(Running,
            When(StatusCheckRequested)
                .Respond(context => new Status
                {
                    CorrelationId = context.Instance.CorrelationId,
                    StatusText = context.Instance.StatusText
                })
                .Then(context => context.Instance.StatusText = "Running") // this change won't be saved
        );
    }

    public State Running { get; private set; }
    public Event<Start> Started { get; private set; }
    public Event<CheckStatus> StatusCheckRequested { get; private set; }
}

在controller中,简单的使用request client获取状态机的状态:

public class SomeController
{
    public SomeController(IRequestClient<CheckStatus> client)
    {
        _client = client;
    }

    public async Task<IActionResult> DoSomething(Guid id)
    {
        var response = await _client.GetResponse<Status>(new { CorrelationId = id});
    }
}

这应该足以让您入门。如果没有,here is a controller example, and the corresponding event and handler。 (示例没有在事件上设置 ReadOnly 属性,但您应该添加它)。