MassTransit Saga - return 当前状态与 REST API 响应
MassTransit Saga - return Current State with REST API response
我正在使用带有 RabbitMQ 传输和 EntityFramework 存储库的 MassTransit Saga 状态机。
我的问题如下 - 当用户点击我的 REST API(.NET 6 Minimal API)中的端点时,作为我业务逻辑的一部分,我将一个事件发布到状态机,改变状态。所以,我想 return 当前(新)状态到 end-user 响应,这是不可能的(或者我找不到解决方案)。
请注意,我的状态机实现在同一个 REST API 项目中。
所以,我有:
最初,我找不到任何可以从存储中获取当前状态的东西(在我的例子中是 SQL 服务器数据库)。所以我创建了一个 EF 方法来执行此操作:
public class StateRepository<StateInstance> : IStateRepository<StateInstance> where StateInstance : SqlServerStateInstance
{
protected readonly DbContext _stateDBContext;
public StateRepository(DbContext stateDBContext)
{
_stateDBContext = stateDBContext;
}
public async Task<StateInstance> GetCurrentStateAsync (Guid correlationId)
{
return await _stateDBContext.Set<StateInstance>().Where(i => i.CorrelationId == correlationId).FirstOrDefaultAsync();
}
}
哪个工作正常...如果我已经设置了状态。 :)
所以,我添加了一个中间件,它调用存储库通过ID获取状态并将其设置在响应的header:
internal class StateMiddleware
{
private readonly RequestDelegate _next;
public StateMiddleware(RequestDelegate next, ILogger<StateMiddleware> logger)
{
_next = next;
_logger = logger;
}
public async Task Invoke(HttpContext context, IStateHandler stateHandler)
{
await _next(context);
string state = await stateHandler.GetCurrentState(correlationID);
context.Response.Headers.Add("X-State", state ?? "NULL?!?");
}
}
已注册WebApplication.UseMiddleware<StateMiddleware>();
但是当我发布一个事件(从任何端点)来更新状态时,比如:
app.MapPost(RoutePrefix + "/sm/call-event/{userId}", async (IPublishEndpoint _publishEndpoint, Guid userId, StateEvents evn) =>
{
switch (evn)
{
case StateEvents.Initiate:
{
await _publishEndpoint.Publish<StateInstance>(new
{
CorrelationId = userId
}, default);
break;
}
case StateEvents.CollectEmail:
{
await _publishEndpoint.Publish<CollectEmailData>(new
{
ActiveAccountExists = false,
CorrelationId = userId
}, default);
break;
}
case StateEvents.ConfirmPass:
{
await _publishEndpoint.Publish<ConfirmPass>(new
{
CorrelationId = userId
}, default);
break;
}
}
return Results.Ok();
})
状态为 NULL
(如果我使用 StateEvents.Initiate
调用端点)或先前的值(使用枚举中的任何其他值)。
这是我的公共交通配置:
services.AddMassTransit(cfg=> {
cfg.AddSagaStateMachine<MyStateMachine, MyStateInstance>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Optimistic; //requires RowVersion!
r.AddDbContext<DbContext, StateRepository>((provider, builder) =>
{
builder.UseSqlServer(connectionString, m =>
{
m.MigrationsAssembly(typeof(StateRepository).Assembly.GetName().Name);
m.MigrationsHistoryTable($"__{typeof(StateRepository).Name}");
});
});
});
cfg.UsingRabbitMq((context, x) =>
{
x.ConfigureEndpoints(context);
x.Host("localhost",
h =>
{
h.Username("guest");
h.Password("guest");
}
);
});
});
services.AddMassTransitHostedService();
我知道 State Machine 是 event-based,它是异步的,但是有什么解决方法吗?
谢谢!
您应该永远不要直接访问那个 saga 存储库以获取此类信息。实例的当前状态和任何其他数据都属于 saga。尝试 read/access 状态,或者在最坏的情况下修改状态,都是被误导的,并且可能会引入各种不一致。
MassTransit 有一个简单的工具来访问 saga 的状态。您可以定义一个请求契约,由 saga 处理,允许 saga 本身以当前 state/details 响应。该请求甚至可以指定为只读,这样它就不会修改状态(或尝试保存未修改的状态)。
下面显示的示例具有响应请求的状态检查:
class ReadOnlyStateMachine :
MassTransitStateMachine<ReadOnlyInstance>
{
public ReadOnlyStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => StatusCheckRequested, x =>
{
x.ReadOnly = true;
});
Initially(
When(Started)
.Then(context => context.Instance.StatusText = "Started")
.Respond(context => new StartupComplete {CorrelationId = context.Instance.CorrelationId})
.TransitionTo(Running)
);
During(Running,
When(StatusCheckRequested)
.Respond(context => new Status
{
CorrelationId = context.Instance.CorrelationId,
StatusText = context.Instance.StatusText
})
.Then(context => context.Instance.StatusText = "Running") // this change won't be saved
);
}
public State Running { get; private set; }
public Event<Start> Started { get; private set; }
public Event<CheckStatus> StatusCheckRequested { get; private set; }
}
在controller中,简单的使用request client获取状态机的状态:
public class SomeController
{
public SomeController(IRequestClient<CheckStatus> client)
{
_client = client;
}
public async Task<IActionResult> DoSomething(Guid id)
{
var response = await _client.GetResponse<Status>(new { CorrelationId = id});
}
}
这应该足以让您入门。如果没有,here is a controller example, and the corresponding event and handler。 (示例没有在事件上设置 ReadOnly 属性,但您应该添加它)。
我正在使用带有 RabbitMQ 传输和 EntityFramework 存储库的 MassTransit Saga 状态机。
我的问题如下 - 当用户点击我的 REST API(.NET 6 Minimal API)中的端点时,作为我业务逻辑的一部分,我将一个事件发布到状态机,改变状态。所以,我想 return 当前(新)状态到 end-user 响应,这是不可能的(或者我找不到解决方案)。 请注意,我的状态机实现在同一个 REST API 项目中。
所以,我有: 最初,我找不到任何可以从存储中获取当前状态的东西(在我的例子中是 SQL 服务器数据库)。所以我创建了一个 EF 方法来执行此操作:
public class StateRepository<StateInstance> : IStateRepository<StateInstance> where StateInstance : SqlServerStateInstance
{
protected readonly DbContext _stateDBContext;
public StateRepository(DbContext stateDBContext)
{
_stateDBContext = stateDBContext;
}
public async Task<StateInstance> GetCurrentStateAsync (Guid correlationId)
{
return await _stateDBContext.Set<StateInstance>().Where(i => i.CorrelationId == correlationId).FirstOrDefaultAsync();
}
}
哪个工作正常...如果我已经设置了状态。 :)
所以,我添加了一个中间件,它调用存储库通过ID获取状态并将其设置在响应的header:
internal class StateMiddleware
{
private readonly RequestDelegate _next;
public StateMiddleware(RequestDelegate next, ILogger<StateMiddleware> logger)
{
_next = next;
_logger = logger;
}
public async Task Invoke(HttpContext context, IStateHandler stateHandler)
{
await _next(context);
string state = await stateHandler.GetCurrentState(correlationID);
context.Response.Headers.Add("X-State", state ?? "NULL?!?");
}
}
已注册WebApplication.UseMiddleware<StateMiddleware>();
但是当我发布一个事件(从任何端点)来更新状态时,比如:
app.MapPost(RoutePrefix + "/sm/call-event/{userId}", async (IPublishEndpoint _publishEndpoint, Guid userId, StateEvents evn) =>
{
switch (evn)
{
case StateEvents.Initiate:
{
await _publishEndpoint.Publish<StateInstance>(new
{
CorrelationId = userId
}, default);
break;
}
case StateEvents.CollectEmail:
{
await _publishEndpoint.Publish<CollectEmailData>(new
{
ActiveAccountExists = false,
CorrelationId = userId
}, default);
break;
}
case StateEvents.ConfirmPass:
{
await _publishEndpoint.Publish<ConfirmPass>(new
{
CorrelationId = userId
}, default);
break;
}
}
return Results.Ok();
})
状态为 NULL
(如果我使用 StateEvents.Initiate
调用端点)或先前的值(使用枚举中的任何其他值)。
这是我的公共交通配置:
services.AddMassTransit(cfg=> {
cfg.AddSagaStateMachine<MyStateMachine, MyStateInstance>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Optimistic; //requires RowVersion!
r.AddDbContext<DbContext, StateRepository>((provider, builder) =>
{
builder.UseSqlServer(connectionString, m =>
{
m.MigrationsAssembly(typeof(StateRepository).Assembly.GetName().Name);
m.MigrationsHistoryTable($"__{typeof(StateRepository).Name}");
});
});
});
cfg.UsingRabbitMq((context, x) =>
{
x.ConfigureEndpoints(context);
x.Host("localhost",
h =>
{
h.Username("guest");
h.Password("guest");
}
);
});
});
services.AddMassTransitHostedService();
我知道 State Machine 是 event-based,它是异步的,但是有什么解决方法吗?
谢谢!
您应该永远不要直接访问那个 saga 存储库以获取此类信息。实例的当前状态和任何其他数据都属于 saga。尝试 read/access 状态,或者在最坏的情况下修改状态,都是被误导的,并且可能会引入各种不一致。
MassTransit 有一个简单的工具来访问 saga 的状态。您可以定义一个请求契约,由 saga 处理,允许 saga 本身以当前 state/details 响应。该请求甚至可以指定为只读,这样它就不会修改状态(或尝试保存未修改的状态)。
下面显示的示例具有响应请求的状态检查:
class ReadOnlyStateMachine :
MassTransitStateMachine<ReadOnlyInstance>
{
public ReadOnlyStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => StatusCheckRequested, x =>
{
x.ReadOnly = true;
});
Initially(
When(Started)
.Then(context => context.Instance.StatusText = "Started")
.Respond(context => new StartupComplete {CorrelationId = context.Instance.CorrelationId})
.TransitionTo(Running)
);
During(Running,
When(StatusCheckRequested)
.Respond(context => new Status
{
CorrelationId = context.Instance.CorrelationId,
StatusText = context.Instance.StatusText
})
.Then(context => context.Instance.StatusText = "Running") // this change won't be saved
);
}
public State Running { get; private set; }
public Event<Start> Started { get; private set; }
public Event<CheckStatus> StatusCheckRequested { get; private set; }
}
在controller中,简单的使用request client获取状态机的状态:
public class SomeController
{
public SomeController(IRequestClient<CheckStatus> client)
{
_client = client;
}
public async Task<IActionResult> DoSomething(Guid id)
{
var response = await _client.GetResponse<Status>(new { CorrelationId = id});
}
}
这应该足以让您入门。如果没有,here is a controller example, and the corresponding event and handler。 (示例没有在事件上设置 ReadOnly 属性,但您应该添加它)。