Rebus 消息处理管道,发送消息前后,处理消息前后,如何添加中间件?

How do I add middleware to the Rebus message processing pipeline, before and after sending a message, and before and after handling a message?

我需要它来简化以下典型例行操作的实施:

  1. 我想在发送消息之前捕获用户的上下文并在处理消息之前恢复用户上下文,类似于以下遗留示例中的操作方式:https://github.com/rebus-org/RebusSamples/tree/master/old/UserContextHeaders

  2. 我想在处理消息之前验证和删除重复消息,并在处理消息后记录结果。

正如问题作者正确理解的那样,Rebus.Events 包提供了可读且可访问的挂钩 before/after 消息的方法是 sent/received。

如果足够的话,我肯定会接受。

但是,如果你想将单个消息的整个处理过程包装在 try/finally 中(当你恢复发送用户的身份来处理消息时,我建议你这样做),你可能想看看本机扩展机制,它基于关于装饰器。

您可以阅读the wiki page about extensibility了解如何通过装饰管道来扩展 Rebus。

例如,要在处理消息之前和之后对当前声明主体执行某些操作,您可以像这样实现“传入管道步骤”:

[StepDocumentation("Write a nice descriptoion here")]
class MyIncomingStep : IIncomingStep
{
    public async Task Process(IncomingStepContext context, Func<Task> next)
    {
        var originalPrincipal = ClaimsPrincipal.Current;

        try 
        {
            // establish user identity here
            ClaimsPrincipal.Current = ...

            // handle message
            await next();
        }
        finally 
        {
            ClaimsPrincipal.Current = originalPrincipal;
        }
    }
}

然后您可以用“步骤注入器”装饰 Rebus 的 IPipeline,声明性地说明您希望在管道中插入步骤的位置:

.Options(o => {
    o.Decorate<IPipeline>(c =>
    {
        var pipeline = c.Get<IPipeline>();
        var stepToInject = new MyIncomingStep();

        return new PipelineStepInjector(pipeline)
            .OnReceive(stepToInject, PipelineRelativePosition.Before, typeof(DispatchIncomingMessageStep));
    });    
})

然后 - 为了使事情变得漂亮 - 您可以将上面的代码包装在 OptionsConfigurer 的扩展方法中,从而获得更漂亮的配置语法:

.Options(o => {
    o.RestoreClaimsPrincipalWhenHandlingMessages();
})

或者任何你认为应该叫它的名字:)

发送消息时一切都以类似的方式工作,您只需要

//....
return new PipelineStepInjector(pipeline)
     .OnSend(stepToInject, PipelineRelativePosition.Before, typeof(SerializeOutgoingMessageStep));

相反。