在 MassTransit.RabbitMq 中向 header 添加值

Adding values to header in MassTransit.RabbitMq

我正在使用 MassTransit 3.0.0.0,我很难理解如何在 Request-Response 场景中拦截消息并向 headers 字段添加一些我可以做到的信息在接收者端阅读。

我正在查看 Middleware, as recommended in the MassTransit docs - see Observers warning - but the context you get on the Send is just a Pipe context that doesn't have access to the Headers field so I cannot alter it. I used the sample provided in Middleware 页面。

然后,我查看了 IPublishInterceptor

public class X<T> : IPublishInterceptor<T> where T : class, PipeContext
{
    public Task PostPublish(PublishContext<T> context)
    {
        return new Task(() => { });
    }

    public Task PostSend(PublishContext<T> context, SendContext<T> sendContext)
    {
        return new Task(() => { });
    }

    public Task PrePublish(PublishContext<T> context)
    {
        context.Headers.Set("ID", Guid.NewGuid().ToString());
        return new Task(() => { });
    }

    public Task PreSend(PublishContext<T> context, SendContext<T> sendContext)
    {
        context.Headers.Set("ID", Guid.NewGuid().ToString());
        return new Task(() => { });
    }
}

非常简洁明了。但是,我不知道它在哪里使用以及如何 link 它到基础设施的其余部分。就目前而言,这只是一个接口,并没有真正 link 编辑任何内容。

http://masstransit-project.com/MassTransit/advanced/middleware/custom.html

显示添加扩展方法以明确您的设置。如果它是一个将被大量使用的拦截器,那将是一个很大的帮助,所以这个目的很明确。如果您愿意,可以跳过该步骤。

基本上,只是...

cfg.AddPipeSpecification(new X<MyMessage>());

配置传输时。

如果您需要在发送消息时添加 headers,您可以将中间件组件添加到发送或发布管道,如下所示。请注意,发送过滤器将应用于所有消息,而发布过滤器将仅应用于已发布的消息。

// execute a synchronous delegate on send
cfg.ConfigureSend(x => x.Execute(context => {}));

// execute a synchronous delegate on publish
cfg.ConfigurePublish(x => x.Execute(context => {}));

中间件可以在总线或单独的接收端点上配置,并且这些配置在配置它的地方是本地的。

你也可以在消费者中添加headers class:

public async Task Consume(ConsumeContext<MyMessage> context)
{
    ....
    await context.Publish<MyEvent>(new { Data = data }, c => AddHeaders(c));
}

public static void AddHeaders(PublishContext context)
{
    context.Headers.Set("CausationId", context.MessageId);
}