等待公共交通传奇完成
Wait for Mass Transit saga to finish
我正在尝试创建一个 saga,return 向调用者提供一些结果,就像 Request/Response 模式一样。如果我调用 Send 方法,但不是通过提交请求,我就可以开始 saga。
因此,saga 逻辑运行良好,但它对客户端没有任何 return 影响。
或者提交的请求由它的消费者处理并且return是对客户端的响应,但永远不会开始传奇。
更新: 的答案似乎不适用于我的问题,原因有二:
1) 我无法通过调用 Request 方法开始 saga;
2) 如果我调用 Send 方法发送请求,稍后发送响应,调用者线程不等待响应返回就继续下一行代码;
[更新结束]
请找到完整代码here。以下是更相关的片段:
这是传奇 class:
public class MySaga : MassTransitStateMachine<MySagaState>
{
public static Uri address = new Uri($"loopback://localhost/req_resp_saga");
public Event<IStartSagaCommand> StartSaga { get; private set; }
public Request<MySagaState, MyRequest, MyResponse> SomeRequest { get; private set; }
public MySaga()
{
InstanceState(s => s.CurrentState);
Event(() => StartSaga,
cc =>
cc.CorrelateBy(state => state.Data, context => context.Message.Data)
.SelectId(context => Guid.NewGuid()));
Request(() => SomeRequest, x => x.NullableCorrelationId, cfg =>
{
cfg.ServiceAddress = address;
cfg.SchedulingServiceAddress = address;
cfg.Timeout = TimeSpan.FromSeconds(30);
});
Initially(
When(StartSaga)
.Then(context =>
{
context.Instance.Data = context.Data.Data;
})
.ThenAsync(
context => Console.Out.WriteLineAsync($"Saga started: " +
$" {context.Data.Data} received"))
.Request(SomeRequest, context => new MyRequest() { CorrelationId = context.Instance.CorrelationId, RequestMessage = "Please do this" })
.TransitionTo(SomeRequest.Pending)
.ThenAsync(context => Console.Out.WriteLineAsync($"Transition completed: " +
$" {(context.Instance.CurrentState == SomeRequest.Pending ? "pending" : "done")} received"))
//.Then(context =>
//{
// var endpoint = context.GetSendEndpoint(address).GetAwaiter().GetResult();
// endpoint.Send(new MyResponse() { CorrelationId = context.Instance.CorrelationId, ResponseMessage = "Your wish is my command" });
//})
);
During(SomeRequest.Pending,
When(SomeRequest.Completed)
.ThenAsync(
context => Console.Out.WriteLineAsync($"Saga ended: " +
$" {context.Data.ResponseMessage} received"))
.Finalize()
);
}
}
这会启动 saga,但不会等待它完成并响应:
var address = new Uri($"loopback://localhost/req_resp_saga");
var endPoint = bus.GetSendEndpoint(address)
.Result;
endPoint.Send<IStartSagaCommand>(new { Data = "Hello World!!" });
这等待响应,但根本不涉及传奇:
var address = new Uri($"loopback://localhost/req_resp_saga");
var requestClient = new MessageRequestClient<MyRequest, MyResponse>(bus, address, TimeSpan.FromSeconds(30));
var response = requestClient.Request(new MyRequest() { CorrelationId = Guid.NewGuid(), RequestMessage = "Please do this" })
.GetAwaiter()
.GetResult();
我怎样才能让调用者启动 saga 并等待它完成并对其响应做些什么?
我自己找到了解决方案。
问题是我对如何通过请求调用触发传奇感到困惑。我以为我必须声明一个
Request<in TInstance, TRequest, TResponse>
(自动)
这对我不起作用。
我用来开始传奇的事件有它自己的界面
Event<IStartSaga>
这与我在调用 Request 方法时使用的不同
var requestClient = new MessageRequestClient<MyRequest, MyResponse>(bus, address, TimeSpan.FromSeconds(30));
var response = requestClient.Request(new MyRequest() { CorrelationId = Guid.NewGuid(), RequestMessage = "Please do this" })
.GetAwaiter()
.GetResult();
因此解决方法是将事件声明更改为
Event<MyRequest>
现在,每当我用 MyResquest 消息调用 Request 时,传奇就开始了。调用者等待 saga 的响应。
我做了一些其他更改以稍微清理代码并将其也推送到 github。
我正在尝试创建一个 saga,return 向调用者提供一些结果,就像 Request/Response 模式一样。如果我调用 Send 方法,但不是通过提交请求,我就可以开始 saga。
因此,saga 逻辑运行良好,但它对客户端没有任何 return 影响。
或者提交的请求由它的消费者处理并且return是对客户端的响应,但永远不会开始传奇。
更新:
1) 我无法通过调用 Request 方法开始 saga;
2) 如果我调用 Send 方法发送请求,稍后发送响应,调用者线程不等待响应返回就继续下一行代码;
[更新结束]
请找到完整代码here。以下是更相关的片段:
这是传奇 class:
public class MySaga : MassTransitStateMachine<MySagaState>
{
public static Uri address = new Uri($"loopback://localhost/req_resp_saga");
public Event<IStartSagaCommand> StartSaga { get; private set; }
public Request<MySagaState, MyRequest, MyResponse> SomeRequest { get; private set; }
public MySaga()
{
InstanceState(s => s.CurrentState);
Event(() => StartSaga,
cc =>
cc.CorrelateBy(state => state.Data, context => context.Message.Data)
.SelectId(context => Guid.NewGuid()));
Request(() => SomeRequest, x => x.NullableCorrelationId, cfg =>
{
cfg.ServiceAddress = address;
cfg.SchedulingServiceAddress = address;
cfg.Timeout = TimeSpan.FromSeconds(30);
});
Initially(
When(StartSaga)
.Then(context =>
{
context.Instance.Data = context.Data.Data;
})
.ThenAsync(
context => Console.Out.WriteLineAsync($"Saga started: " +
$" {context.Data.Data} received"))
.Request(SomeRequest, context => new MyRequest() { CorrelationId = context.Instance.CorrelationId, RequestMessage = "Please do this" })
.TransitionTo(SomeRequest.Pending)
.ThenAsync(context => Console.Out.WriteLineAsync($"Transition completed: " +
$" {(context.Instance.CurrentState == SomeRequest.Pending ? "pending" : "done")} received"))
//.Then(context =>
//{
// var endpoint = context.GetSendEndpoint(address).GetAwaiter().GetResult();
// endpoint.Send(new MyResponse() { CorrelationId = context.Instance.CorrelationId, ResponseMessage = "Your wish is my command" });
//})
);
During(SomeRequest.Pending,
When(SomeRequest.Completed)
.ThenAsync(
context => Console.Out.WriteLineAsync($"Saga ended: " +
$" {context.Data.ResponseMessage} received"))
.Finalize()
);
}
}
这会启动 saga,但不会等待它完成并响应:
var address = new Uri($"loopback://localhost/req_resp_saga");
var endPoint = bus.GetSendEndpoint(address)
.Result;
endPoint.Send<IStartSagaCommand>(new { Data = "Hello World!!" });
这等待响应,但根本不涉及传奇:
var address = new Uri($"loopback://localhost/req_resp_saga");
var requestClient = new MessageRequestClient<MyRequest, MyResponse>(bus, address, TimeSpan.FromSeconds(30));
var response = requestClient.Request(new MyRequest() { CorrelationId = Guid.NewGuid(), RequestMessage = "Please do this" })
.GetAwaiter()
.GetResult();
我怎样才能让调用者启动 saga 并等待它完成并对其响应做些什么?
我自己找到了解决方案。
问题是我对如何通过请求调用触发传奇感到困惑。我以为我必须声明一个
Request<in TInstance, TRequest, TResponse>
(自动)
这对我不起作用。
我用来开始传奇的事件有它自己的界面
Event<IStartSaga>
这与我在调用 Request 方法时使用的不同
var requestClient = new MessageRequestClient<MyRequest, MyResponse>(bus, address, TimeSpan.FromSeconds(30));
var response = requestClient.Request(new MyRequest() { CorrelationId = Guid.NewGuid(), RequestMessage = "Please do this" })
.GetAwaiter()
.GetResult();
因此解决方法是将事件声明更改为
Event<MyRequest>
现在,每当我用 MyResquest 消息调用 Request 时,传奇就开始了。调用者等待 saga 的响应。
我做了一些其他更改以稍微清理代码并将其也推送到 github。