如何在使用“IBusControl”发布消息时在 MassTransit 消费者之间共享上下文数据?

How to share Context data between MassTransit Consumers while using the `IBusControl` for publishing messages?

我在带有 RabbitMQ(用于本地开发)和 SQS(用于部署)的 .NET Core 应用程序中使用 MassTransit 7.2.2,其中单个消息处理可能导致创建和处理多个新消息。

所有消息共享相同的基类型

public class BaseMessage : CorrelatedBy<Guid>
{
    public BaseMessage()
    {
        CorrelationId = Guid.NewGuid();
        CreationDate = DateTime.UtcNow;
    }
    public Guid CorrelationId { get; init; }
    public DateTime CreationDate { get; }
    public Guid? ConversationId { get; set; }
}

所有消息的基本处理流程都相同,每个消费者都有一个服务。

public class FirstMessage : BaseMessage
{

}

public class FirstConsumer : IConsumer<FirstMessage>
{
    private readonly ILogger<FirstConsumer> _logger;
    private readonly FirstService _service;

    public FirstConsumer(ILogger<FirstConsumer> logger, FirstService service)
    {
        _logger = logger;
        _service = service;
    }

    public Task Consume(ConsumeContext<FirstMessage> context)
    {
        _logger.LogInformation($"FirstConsumer CorrelationId: {context.CorrelationId} and ConversationId: {context.ConversationId} and InitiatorId: {context.InitiatorId}");
        _service.Process(context.Message);
        return Task.CompletedTask;
    }
}

public class FirstService
{
    private readonly IBusControl _busControl;
    private readonly ILogger<FirstService> _logger;

    public FirstService(IBusControl busControl, ILogger<FirstService> logger)
    {
        _busControl = busControl;
        _logger = logger;
    }

    public Task Process(FirstMessage firstMessage)
    {
        var secondMessage = new SecondMessage();
        _busControl.Publish(secondMessage);
        return Task.CompletedTask;
    }
}

上面的代码是一个例子,实际的代码库有 30 多个消费者,并且都有相同的模式,即每个消费者有一个服务,消息被传递给服务进行处理。

我正在尝试通过使用 ID 实施端到端跟踪消息的解决方案。

  1. ConversationId - 用于跟踪图表中所有消费者日志的唯一 ID
  2. CorrelationId - 用于在消费者中跟踪日志的唯一 ID
  3. InitiatorId - Parent Id

有一个消息处理图看起来像

FirstConsumer -> SecondConsumer -> ThirdConsumer.

我有以下过滤器

ConsumeFilter

public class SimpleConsumeMessageFilter<TContext, TMessage> : IFilter<TContext>
    where TContext : class, ConsumeContext<TMessage>
    where TMessage : class
{
    public SimpleConsumeMessageFilter()
    {

    }

    public async Task Send(TContext context, IPipe<TContext> next)
    {
        LogContext.PushProperty("CorrelationId", context.CorrelationId);
        LogContext.PushProperty("ConversationId", context.ConversationId);
        LogContext.PushProperty("InitiatorId", context.InitiatorId);
        await next.Send(context);
    }

    public void Probe(ProbeContext context)
    {
        context.CreateScope("consume-filter");
    }
}

public class SimpleConsumeMessagePipeSpec<TConsumer, TMessage> : IPipeSpecification<ConsumerConsumeContext<TConsumer, TMessage>>
    where TConsumer : class
    where TMessage : class
{
    public void Apply(IPipeBuilder<ConsumerConsumeContext<TConsumer, TMessage>> builder)
    {
        builder.AddFilter(new SimpleConsumeMessageFilter<ConsumerConsumeContext<TConsumer, TMessage>, TMessage>());
    }

    public IEnumerable<ValidationResult> Validate()
    {
        return Enumerable.Empty<ValidationResult>();
    }
}

public class SimpleConsumePipeSpecObserver : IConsumerConfigurationObserver
{
    public void ConsumerConfigured<TConsumer>(IConsumerConfigurator<TConsumer> configurator)
        where TConsumer : class
    {
        
    }

    public void ConsumerMessageConfigured<TConsumer, TMessage>(IConsumerMessageConfigurator<TConsumer, TMessage> configurator)
        where TConsumer : class
        where TMessage : class
    {
        configurator.AddPipeSpecification(new SimpleConsumeMessagePipeSpec<TConsumer, TMessage>());
    }
}

PublishFilter

public class SimplePublishMessageFilter<TMessage> : IFilter<PublishContext<TMessage>> where TMessage : class
{
    public SimplePublishMessageFilter()
    {

    }

    public async Task Send(PublishContext<TMessage> context, IPipe<PublishContext<TMessage>> next)
    {

        if (context.Headers.TryGetHeader("ConversationId", out object @value))
        {
            var conversationId = Guid.Parse(@value.ToString());
            context.ConversationId = conversationId;
        }
        else
        {
            if (context.Message is BaseMessage baseEvent && !context.ConversationId.HasValue)
            {
                context.ConversationId = baseEvent.ConversationId ?? Guid.NewGuid();
                context.Headers.Set("ConversationId", context.ConversationId.ToString());
            }
        }
        await next.Send(context);
    }

    public void Probe(ProbeContext context)
    {
        context.CreateScope("publish-filter");
    }
}

public class SimplePublishMessagePipeSpec<TMessage> : IPipeSpecification<PublishContext<TMessage>> where TMessage : class
{
    public void Apply(IPipeBuilder<PublishContext<TMessage>> builder)
    {
        builder.AddFilter(new SimplePublishMessageFilter<TMessage>());
    }

    public IEnumerable<ValidationResult> Validate()
    {
        return Enumerable.Empty<ValidationResult>();
    }
}

public class SimplePublishPipeSpecObserver : IPublishPipeSpecificationObserver
{
    public void MessageSpecificationCreated<TMessage>(IMessagePublishPipeSpecification<TMessage> specification)
        where TMessage : class
    {
        specification.AddPipeSpecification(new SimplePublishMessagePipeSpec<TMessage>());
    }
}

Added to config via

x.UsingRabbitMq((context, cfg) =>
            {
                cfg.ConnectConsumerConfigurationObserver(new SimpleConsumePipeSpecObserver());
                cfg.ConfigurePublish(ppc =>
                {
                    ppc.ConnectPublishPipeSpecificationObserver(new SimplePublishPipeSpecObserver());
                });
                cfg.UseDelayedMessageScheduler();
                cfg.ConfigureEndpoints(context);
                cfg.Host("localhost", rmq =>
                {
                    rmq.Username("guest");
                    rmq.Password("guest");
                });
            });

使用上述方法,当 SecondConsumer 的过滤器为 运行.

时,'CorrelationId' header 会丢失

我尝试了以下更改,它似乎使 ID 流过消费者。 但是,采用这种方法将影响依赖于 IBusControl 接口的大部分代码/测试。我将其保留为备用选项,以防找不到任何其他解决方案。

public class FirstService
{
    private readonly ILogger<FirstService> _logger;

    public FirstService(ILogger<FirstService> logger)
    {            
        _logger = logger;
    }

    public Task Process( ConsumeContext<FirstMessage> consumeContext)
    {
        var secondMessage = new SecondMessage();
        consumeContext.Publish(secondMessage);
        return Task.CompletedTask;
    }
}

问题:有没有办法在使用IBusControl发送/发布消息时在消费者之间共享上下文数据?

非常感谢

作为 explained in the documentation,消费者(及其依赖项)在 sending/publishing 消息时必须使用以下之一:

  • ConsumeContext,通常在消费者内部
  • IPublishEndpointISendEndpointProvider,通常由消费者 scoped 依赖项使用
  • IBus,不得已,因为所有上下文数据都从入站消息中丢失

关于您的最后一个问题,“有没有办法在使用 IBusControl 发送/发布消息时在消费者之间共享上下文数据?”答案是不。需要使用上下文来访问任何上下文数据。