事项消息转换器

Rebus Message Mutator

Rebus 曾一度支持 Message Mutators。我似乎无法在 Rebus 源代码中找到它们了。他们改名了吗?它们还存在吗?

示例代码:

Configure.With(senderAdapter)
    .Transport(t => t.UseMsmq(SenderInputQueueName, "error"))
    .Events(e =>
        {
            e.MessageMutators.Add(new EvilMutator("first"));
            e.MessageMutators.Add(new EvilMutator("second"));
            e.MessageMutators.Add(new EvilMutator("third"));
        })
    .CreateBus().Start();

“Rebus 2”(自 0.90.0 以来的所有 Rebus 版本)没有消息修改器,因为它是超级可扩展的,并且使用 [=23 添加一些改变 incoming/outgoing 消息的东西非常容易=]管道。

管道遵循“俄罗斯套娃”模型,其中每个步骤负责调用管道的其余部分。

可以像这样添加一个新的“mutator”步骤——首先,我们创建一个能够改变 incoming/outgoing 消息的步骤:

public class MyMutatorStep : IIncomingStep, IOutgoingStep
{
    public async Task Process(OutgoingStepContext context, Func<Task> next)
    {
        // here we have the message
        var message = context.Load<Message>();

        // mutate (or, more like "cripple", actually )
        context.Save(new Message(headers: message.Headers, body: new object()));

        await next();
    }

    public async Task Process(IncomingStepContext context, Func<Task> next)
    {
        // here we have the message again
        var message = context.Load<Message>();

        await next();
    }
}

然后我们装饰管道,分别注入serialization/after反序列化之前的步骤:

Configure.With(...)
    .(...)
    .Options(o => o.Decorate<IPipeline>(c => {
        var pipeline = c.Get<IPipeline>();
        var step = new MyMutatorStep();

        return new PipelineStepInjector(pipeline)
            .OnReceive(step, PipelineStepRelativePosition.After, typeof(DeserializeIncomingMessageStep))
            .OnSend(step, PipelineStepRelativePosition.Before, typeof(SerializeOutgoingMessageStep));
    }))
    .Start();

在这个例子中,我通过用 new object() 替换消息正文来改变外发消息,这可能不是您想要的,但希望您能了解可能性。