在使用 NServiceBus 触发 saga 处理程序之前可以操作 SagaData
Can SagaData be manipulated before the saga handler fires using NServiceBus
使用 NServiceBus (v6),有没有办法确保在触发消息的 Saga 处理程序之前在 SagaData object 中设置 属性?
我们的环境是 multi-tenant,所以我想确保将正确的 CustomerId 用于数据库访问等。并且开发人员不要忘记从传入的 message/message header.
例如给定这个 saga 数据...
public interface ICustomerSagaData : IContainSagaData
{
Guid CustomerId { get; set; }
}
public class SomeProcessSagaData : ICustomerSagaData
{
// IContainSagaData and other properties removed for brevity ...
#region ICustomerSagaData properties
public virtual Guid CustomerId { get; set; }
#endregion
}
...以及以下 Saga ...
public class SomeProcessSagaSaga :
Saga<SomeProcessSagaData>,
IAmStartedByMessages<StartProcess>
{
public async Task Handle(StartProcess message, IMessageHandlerContext context)
{
// How do I ensure that Data.CustomerId is already set at this point?
}
// ConfigureHowToFindSaga etc ...
}
我最初尝试将一个行为插入到管道中,例如
public class MyInvokeHandlerBehavior : Behavior<IInvokeHandlerContext>
{
public override async Task Invoke(IInvokeHandlerContext context, Func<Task> next)
{
// Ideally I'd like to set the CustomerId here before the
// Saga Handler is invoked but calls to ...
// context.Extensions.TryGet(out activeSagaInstance);
// return a null activeSagaInstance
await next().ConfigureAwait(false);
// This is the only point I can get the saga data object but
// as mentioned above the hander has already been invoked
ActiveSagaInstance activeSagaInstance;
if (context.Extensions.TryGet(out activeSagaInstance))
{
var instance = activeSagaInstance.Instance.Entity as ICustomerSagaData;
if (instance != null)
{
Guid customerId;
if (Guid.TryParse(context.Headers["CustomerId"), out customerId))
{
instance.CustomerId = customerId;
}
}
}
}
}
...但这只允许访问 SagaData 实例在处理程序被解雇后。
因此直接回答您的问题 Data.CustomerId 不会在您处理 StartProcess 消息时设置。您需要使用消息中的 ID 来设置它。
public async Task Handle(StartProcess message, IMessageHandlerContext context)
{
Data.CustomerId = message.CustomerId;
}
话虽如此,您上面的示例缺少一个关键部分,即确定如何查找 saga 以继续处理的代码:
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<SomeProcessSagaData> mapper)
{
mapper.ConfigureMapping<StartProcess>(message => message.CustomerId)
.ToSaga(sagaData => sagaData.CustomerId);
}
每次发送由 saga 处理的消息类型时,您需要配置 ConfigureHowToFindSaga()
方法,以便它可以查找之前启动的 saga 以继续处理。因此,从本质上讲,您将 为您使用 StartProcess
消息发送的每个 customerid 开始一个新的 saga。您可以在这里阅读更多相关信息:https://docs.particular.net/nservicebus/sagas/
所以现在真正的问题是此时您真的需要使用 saga 吗?该示例似乎只处理一种类型的消息,所以您真的需要保存 CustomerId 的状态吗?在您的示例中,传奇的开销不是必需的,我相信根据上面的示例,常规处理程序就可以了。
迟到的答案,但您需要确保您的行为在 SagaPersistenceBehavior 之后执行。
在您的 IConfigureThisEndpoint 实现中:
public virtual void Customize(EndpointConfiguration configuration)
{
configuration.Pipeline.Register<Registration>();
}
public class Registration : RegisterStep
{
public Registration()
: base(
stepId: "AuditMutator",
behavior: typeof(AuditMutator),
description: "Sets up for auditing")
{
this.InsertAfterIfExists("InvokeSaga");
}
}
使用 NServiceBus (v6),有没有办法确保在触发消息的 Saga 处理程序之前在 SagaData object 中设置 属性?
我们的环境是 multi-tenant,所以我想确保将正确的 CustomerId 用于数据库访问等。并且开发人员不要忘记从传入的 message/message header.
例如给定这个 saga 数据...
public interface ICustomerSagaData : IContainSagaData
{
Guid CustomerId { get; set; }
}
public class SomeProcessSagaData : ICustomerSagaData
{
// IContainSagaData and other properties removed for brevity ...
#region ICustomerSagaData properties
public virtual Guid CustomerId { get; set; }
#endregion
}
...以及以下 Saga ...
public class SomeProcessSagaSaga :
Saga<SomeProcessSagaData>,
IAmStartedByMessages<StartProcess>
{
public async Task Handle(StartProcess message, IMessageHandlerContext context)
{
// How do I ensure that Data.CustomerId is already set at this point?
}
// ConfigureHowToFindSaga etc ...
}
我最初尝试将一个行为插入到管道中,例如
public class MyInvokeHandlerBehavior : Behavior<IInvokeHandlerContext>
{
public override async Task Invoke(IInvokeHandlerContext context, Func<Task> next)
{
// Ideally I'd like to set the CustomerId here before the
// Saga Handler is invoked but calls to ...
// context.Extensions.TryGet(out activeSagaInstance);
// return a null activeSagaInstance
await next().ConfigureAwait(false);
// This is the only point I can get the saga data object but
// as mentioned above the hander has already been invoked
ActiveSagaInstance activeSagaInstance;
if (context.Extensions.TryGet(out activeSagaInstance))
{
var instance = activeSagaInstance.Instance.Entity as ICustomerSagaData;
if (instance != null)
{
Guid customerId;
if (Guid.TryParse(context.Headers["CustomerId"), out customerId))
{
instance.CustomerId = customerId;
}
}
}
}
}
...但这只允许访问 SagaData 实例在处理程序被解雇后。
因此直接回答您的问题 Data.CustomerId 不会在您处理 StartProcess 消息时设置。您需要使用消息中的 ID 来设置它。
public async Task Handle(StartProcess message, IMessageHandlerContext context)
{
Data.CustomerId = message.CustomerId;
}
话虽如此,您上面的示例缺少一个关键部分,即确定如何查找 saga 以继续处理的代码:
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<SomeProcessSagaData> mapper)
{
mapper.ConfigureMapping<StartProcess>(message => message.CustomerId)
.ToSaga(sagaData => sagaData.CustomerId);
}
每次发送由 saga 处理的消息类型时,您需要配置 ConfigureHowToFindSaga()
方法,以便它可以查找之前启动的 saga 以继续处理。因此,从本质上讲,您将 为您使用 StartProcess
消息发送的每个 customerid 开始一个新的 saga。您可以在这里阅读更多相关信息:https://docs.particular.net/nservicebus/sagas/
所以现在真正的问题是此时您真的需要使用 saga 吗?该示例似乎只处理一种类型的消息,所以您真的需要保存 CustomerId 的状态吗?在您的示例中,传奇的开销不是必需的,我相信根据上面的示例,常规处理程序就可以了。
迟到的答案,但您需要确保您的行为在 SagaPersistenceBehavior 之后执行。
在您的 IConfigureThisEndpoint 实现中:
public virtual void Customize(EndpointConfiguration configuration)
{
configuration.Pipeline.Register<Registration>();
}
public class Registration : RegisterStep
{
public Registration()
: base(
stepId: "AuditMutator",
behavior: typeof(AuditMutator),
description: "Sets up for auditing")
{
this.InsertAfterIfExists("InvokeSaga");
}
}