在 MassTransit.RabbitMq 中向 header 添加值

Adding values to header in MassTransit.RabbitMq

我正在使用 MassTransit,我很难理解如何在 Request-Response 场景中拦截消息并向 headers 字段添加一些我可以做到的信息在接收者端阅读。

我正在查看 Middleware, as recommended in the MassTransit docs - see Observers warning - but the context you get on the Send is just a Pipe context that doesn't have access to the Headers field so I cannot alter it. I used the sample provided in Middleware 页面。

然后,我查看了 IPublishInterceptor

public class X<T> : IPublishInterceptor<T> where T : class, PipeContext
    public Task PostPublish(PublishContext<T> context)
        return new Task(() => { });

    public Task PostSend(PublishContext<T> context, SendContext<T> sendContext)
        return new Task(() => { });

    public Task PrePublish(PublishContext<T> context)
        context.Headers.Set("ID", Guid.NewGuid().ToString());
        return new Task(() => { });

    public Task PreSend(PublishContext<T> context, SendContext<T> sendContext)
        context.Headers.Set("ID", Guid.NewGuid().ToString());
        return new Task(() => { });

非常简洁明了。但是,我不知道它在哪里使用以及如何 link 它到基础设施的其余部分。就目前而言,这只是一个接口,并没有真正 link 编辑任何内容。




cfg.AddPipeSpecification(new X<MyMessage>());


如果您需要在发送消息时添加 headers,您可以将中间件组件添加到发送或发布管道,如下所示。请注意,发送过滤器将应用于所有消息,而发布过滤器将仅应用于已发布的消息。

// execute a synchronous delegate on send
cfg.ConfigureSend(x => x.Execute(context => {}));

// execute a synchronous delegate on publish
cfg.ConfigurePublish(x => x.Execute(context => {}));


你也可以在消费者中添加headers class:

public async Task Consume(ConsumeContext<MyMessage> context)
    await context.Publish<MyEvent>(new { Data = data }, c => AddHeaders(c));

public static void AddHeaders(PublishContext context)
    context.Headers.Set("CausationId", context.MessageId);