在使用 NServiceBus 触发 saga 处理程序之前可以操作 SagaData

Can SagaData be manipulated before the saga handler fires using NServiceBus

使用 NServiceBus (v6),有没有办法确保在触发消息的 Saga 处理程序之前在 SagaData object 中设置 属性?

我们的环境是 multi-tenant,所以我想确保将正确的 CustomerId 用于数据库访问等。并且开发人员不要忘记从传入的 message/message header.

例如给定这个 saga 数据...

public interface ICustomerSagaData : IContainSagaData
{
    Guid CustomerId { get; set; }
}

public class SomeProcessSagaData : ICustomerSagaData
{
    // IContainSagaData and other properties removed for brevity ...

    #region ICustomerSagaData properties

    public virtual Guid CustomerId { get; set; }

    #endregion
}

...以及以下 Saga ...

public class  SomeProcessSagaSaga :
    Saga<SomeProcessSagaData>,
    IAmStartedByMessages<StartProcess>
{
    public async Task Handle(StartProcess message, IMessageHandlerContext context)
    {
        // How do I ensure that Data.CustomerId is already set at this point?

    }

    // ConfigureHowToFindSaga etc ...
}

我最初尝试将一个行为插入到管道中,例如

public class MyInvokeHandlerBehavior : Behavior<IInvokeHandlerContext>
{
    public override async Task Invoke(IInvokeHandlerContext context, Func<Task> next)
    {
        // Ideally I'd like to set the CustomerId here before the 
        // Saga Handler is invoked but calls to ...     
        //      context.Extensions.TryGet(out activeSagaInstance);
        // return a null activeSagaInstance

        await next().ConfigureAwait(false);

        // This is the only point I can get the saga data object but
        //  as mentioned above the hander has already been invoked
        ActiveSagaInstance activeSagaInstance;
        if (context.Extensions.TryGet(out activeSagaInstance))
        {
            var instance = activeSagaInstance.Instance.Entity as ICustomerSagaData;
            if (instance != null)
            {
                Guid customerId;
                if (Guid.TryParse(context.Headers["CustomerId"), out customerId))
                {
                    instance.CustomerId = customerId; 
                }
            }
        }
    }
}

...但这只允许访问 SagaData 实例处理程序被解雇后。

因此直接回答您的问题 Data.CustomerId 不会在您处理 StartProcess 消息时设置。您需要使用消息中的 ID 来设置它。

public async Task Handle(StartProcess message, IMessageHandlerContext context)
{
    Data.CustomerId = message.CustomerId;
}

话虽如此,您上面的示例缺少一个关键部分,即确定如何查找 saga 以继续处理的代码:

protected override void ConfigureHowToFindSaga(SagaPropertyMapper<SomeProcessSagaData> mapper)
{
    mapper.ConfigureMapping<StartProcess>(message => message.CustomerId)
        .ToSaga(sagaData => sagaData.CustomerId);
}

每次发送由 saga 处理的消息类型时,您需要配置 ConfigureHowToFindSaga() 方法,以便它可以查找之前启动的 saga 以继续处理。因此,从本质上讲,您将 为您使用 StartProcess 消息发送的每个 customerid 开始一个新的 saga。您可以在这里阅读更多相关信息:https://docs.particular.net/nservicebus/sagas/

所以现在真正的问题是此时您真的需要使用 saga 吗?该示例似乎只处理一种类型的消息,所以您真的需要保存 CustomerId 的状态吗?在您的示例中,传奇的开销不是必需的,我相信根据上面的示例,常规处理程序就可以了。

迟到的答案,但您需要确保您的行为在 SagaPersistenceBehavior 之后执行。

在您的 IConfigureThisEndpoint 实现中:

    public virtual void Customize(EndpointConfiguration configuration)
    {
        configuration.Pipeline.Register<Registration>();
    }

    public class Registration : RegisterStep
    {
        public Registration()
            : base(
                stepId: "AuditMutator",
                behavior: typeof(AuditMutator),
                description: "Sets up for auditing")
        {
            this.InsertAfterIfExists("InvokeSaga");
        }
    }