MassTransit Activity 参数错误
MassTransit Activity Fault with parameters
我目前正在使用 Courier 模式的公共交通。
我已经设置了一个可能会失败的 Activity,我希望能够订阅此失败并采取相应措施。
我的问题是,即使我可以订阅失败,甚至看到导致失败的异常,我也无法向它传递任何参数。
出于测试目的,假设我有以下 activity:
public class MyActivity : ExecuteActivity<MyMessage>
{
public Task<ExecutionResult> Execute(ExecuteContext<MyMessage> context)
{
try
{
// .... some code
throw new FaultException<RegistrationRefusedData>(
new RegistrationRefusedData(RegistrationRefusedReason.ItemUnavailable));
// .... some code
}
catch (Exception ex)
{
return Task.FromResult(context.Faulted(ex));
}
}
}
问题出在我作为异常参数传递的原因 (RegistrationRefusedReason
) 中。如果我订阅了一个 RoutingSlipActivityFaulted
消费者,我几乎可以 获得我需要的所有信息:
public class ActivityFaultedConsumer : IMessageConsumer<RoutingSlipActivityFaulted>
{
public void Consume(RoutingSlipActivityFaulted message)
{
string exceptionMessage = message.ExceptionInfo.Message; // OK
string messageType = message.ExceptionInfo.ExceptionType; // OK
RegistrationRefusedReason reason = ??????;
}
}
我觉得我在这里遗漏了一些重要的东西,(也许误用了模式?)。
是否有任何其他方法可以从故障 activity 获取参数?
因此,您描述的案例不是 Fault
。不符合商业条件是失败的。在这种情况下,您不想重试交易,而是想终止它。要通知传送名单的发起人,您需要 Publish
一个业务事件,表明交易因业务状况而未完成。
例如,在您的情况下,您可以执行以下操作:
context.Publish<RegistrationRefused>(new {
CustomerId = xxx,
ItemId = xxxx,
Reason = "Item was unavailable"
});
context.Terminate();
这将终止路由单(后续活动将不会执行),并产生一个 RoutingSlipTerminated
事件。
这是根据业务条件或规则结束传送名单的正确方法。异常仅适用于异常行为,因为您可能希望重试它们来处理失败。
有点起死回生,但我真的没有找到解决这个问题的好方法。
这是我的场景:
- 我想执行一个request/response,但我想等待路由单的执行。
- 作为 Fabio,我想补偿之前的任何活动,并且我想在出现故障时将数据传回请求客户端。
方便的是,Chris 提供了一个 RoutingSlipRequestProxy
/RoutingSlipResponseProxy
,它就是这样做的。我找到了 2 种方法,但它们对我来说都很老套。
方法一:
- 请求客户端等待
ISimpleResponse
或ISimpleFailResponse
。
RoutingSlipRequestProxy
设置变量中的 ResponseAddress
。
- activity发送
ISimpleFailResponse
到ResponseAddress
。
- 客户端等待任一响应
RoutingSlipResponseProxy
将 Fault<ISimpleResponse>
发送回 ResponseAddress
。
据我所知,问题来自步骤 4/5 及其顺序。我很确定它可以工作,但如果消息被乱序使用,它很容易停止工作。
示例代码:https://github.com/steliyan/Sample-RequestResponse/commit/3fcb196804d9db48617a49c7a8f8c276b47b03ef
方法二:
- 请求客户端等待
ISimpleResponse
或ISimpleFailResponse
。
- activity 使用变量调用
ReviseItirery
并添加错误的 activity。*
- 故障activity故障
-
RoutingSlipResponseProxy2
获取 ValidationErrors
并将 ISimpleFailResponse
发送回 ResponseAddress
。
* activity 需要是 Activity
而不是 ExecuteActivity
因为 ReviseItinerary
没有重载变量但没有 activity 日志。
这种方法看起来很老套,因为在行程中添加了一个额外的错误 activity,只是为了能够向路由单中添加一个变量。
示例代码:https://github.com/steliyan/Sample-RequestResponse/commit/e9644fa683255f2bda8ae33d8add742f6ffe3817
结论:
查看 MassTransit 代码,添加 FaultedWithVariables
重载似乎不是问题。不过,我认为 Chris 的观点是应该有更好的方法来设计工作流程,但我不确定。
我目前正在使用 Courier 模式的公共交通。
我已经设置了一个可能会失败的 Activity,我希望能够订阅此失败并采取相应措施。
我的问题是,即使我可以订阅失败,甚至看到导致失败的异常,我也无法向它传递任何参数。
出于测试目的,假设我有以下 activity:
public class MyActivity : ExecuteActivity<MyMessage>
{
public Task<ExecutionResult> Execute(ExecuteContext<MyMessage> context)
{
try
{
// .... some code
throw new FaultException<RegistrationRefusedData>(
new RegistrationRefusedData(RegistrationRefusedReason.ItemUnavailable));
// .... some code
}
catch (Exception ex)
{
return Task.FromResult(context.Faulted(ex));
}
}
}
问题出在我作为异常参数传递的原因 (RegistrationRefusedReason
) 中。如果我订阅了一个 RoutingSlipActivityFaulted
消费者,我几乎可以 获得我需要的所有信息:
public class ActivityFaultedConsumer : IMessageConsumer<RoutingSlipActivityFaulted>
{
public void Consume(RoutingSlipActivityFaulted message)
{
string exceptionMessage = message.ExceptionInfo.Message; // OK
string messageType = message.ExceptionInfo.ExceptionType; // OK
RegistrationRefusedReason reason = ??????;
}
}
我觉得我在这里遗漏了一些重要的东西,(也许误用了模式?)。
是否有任何其他方法可以从故障 activity 获取参数?
因此,您描述的案例不是 Fault
。不符合商业条件是失败的。在这种情况下,您不想重试交易,而是想终止它。要通知传送名单的发起人,您需要 Publish
一个业务事件,表明交易因业务状况而未完成。
例如,在您的情况下,您可以执行以下操作:
context.Publish<RegistrationRefused>(new {
CustomerId = xxx,
ItemId = xxxx,
Reason = "Item was unavailable"
});
context.Terminate();
这将终止路由单(后续活动将不会执行),并产生一个 RoutingSlipTerminated
事件。
这是根据业务条件或规则结束传送名单的正确方法。异常仅适用于异常行为,因为您可能希望重试它们来处理失败。
有点起死回生,但我真的没有找到解决这个问题的好方法。
这是我的场景:
- 我想执行一个request/response,但我想等待路由单的执行。
- 作为 Fabio,我想补偿之前的任何活动,并且我想在出现故障时将数据传回请求客户端。
方便的是,Chris 提供了一个 RoutingSlipRequestProxy
/RoutingSlipResponseProxy
,它就是这样做的。我找到了 2 种方法,但它们对我来说都很老套。
方法一:
- 请求客户端等待
ISimpleResponse
或ISimpleFailResponse
。 RoutingSlipRequestProxy
设置变量中的ResponseAddress
。- activity发送
ISimpleFailResponse
到ResponseAddress
。 - 客户端等待任一响应
RoutingSlipResponseProxy
将Fault<ISimpleResponse>
发送回ResponseAddress
。
据我所知,问题来自步骤 4/5 及其顺序。我很确定它可以工作,但如果消息被乱序使用,它很容易停止工作。
示例代码:https://github.com/steliyan/Sample-RequestResponse/commit/3fcb196804d9db48617a49c7a8f8c276b47b03ef
方法二:
- 请求客户端等待
ISimpleResponse
或ISimpleFailResponse
。 - activity 使用变量调用
ReviseItirery
并添加错误的 activity。* - 故障activity故障
-
RoutingSlipResponseProxy2
获取ValidationErrors
并将ISimpleFailResponse
发送回ResponseAddress
。
* activity 需要是 Activity
而不是 ExecuteActivity
因为 ReviseItinerary
没有重载变量但没有 activity 日志。
这种方法看起来很老套,因为在行程中添加了一个额外的错误 activity,只是为了能够向路由单中添加一个变量。
示例代码:https://github.com/steliyan/Sample-RequestResponse/commit/e9644fa683255f2bda8ae33d8add742f6ffe3817
结论:
查看 MassTransit 代码,添加 FaultedWithVariables
重载似乎不是问题。不过,我认为 Chris 的观点是应该有更好的方法来设计工作流程,但我不确定。