如何在使用“IBusControl”发布消息时在 MassTransit 消费者之间共享上下文数据?
How to share Context data between MassTransit Consumers while using the `IBusControl` for publishing messages?
我在带有 RabbitMQ(用于本地开发)和 SQS(用于部署)的 .NET Core 应用程序中使用 MassTransit 7.2.2,其中单个消息处理可能导致创建和处理多个新消息。
所有消息共享相同的基类型
public class BaseMessage : CorrelatedBy<Guid>
{
public BaseMessage()
{
CorrelationId = Guid.NewGuid();
CreationDate = DateTime.UtcNow;
}
public Guid CorrelationId { get; init; }
public DateTime CreationDate { get; }
public Guid? ConversationId { get; set; }
}
所有消息的基本处理流程都相同,每个消费者都有一个服务。
public class FirstMessage : BaseMessage
{
}
public class FirstConsumer : IConsumer<FirstMessage>
{
private readonly ILogger<FirstConsumer> _logger;
private readonly FirstService _service;
public FirstConsumer(ILogger<FirstConsumer> logger, FirstService service)
{
_logger = logger;
_service = service;
}
public Task Consume(ConsumeContext<FirstMessage> context)
{
_logger.LogInformation($"FirstConsumer CorrelationId: {context.CorrelationId} and ConversationId: {context.ConversationId} and InitiatorId: {context.InitiatorId}");
_service.Process(context.Message);
return Task.CompletedTask;
}
}
public class FirstService
{
private readonly IBusControl _busControl;
private readonly ILogger<FirstService> _logger;
public FirstService(IBusControl busControl, ILogger<FirstService> logger)
{
_busControl = busControl;
_logger = logger;
}
public Task Process(FirstMessage firstMessage)
{
var secondMessage = new SecondMessage();
_busControl.Publish(secondMessage);
return Task.CompletedTask;
}
}
上面的代码是一个例子,实际的代码库有 30 多个消费者,并且都有相同的模式,即每个消费者有一个服务,消息被传递给服务进行处理。
我正在尝试通过使用 ID 实施端到端跟踪消息的解决方案。
- ConversationId - 用于跟踪图表中所有消费者日志的唯一 ID
- CorrelationId - 用于在消费者中跟踪日志的唯一 ID
- InitiatorId - Parent Id
有一个消息处理图看起来像
FirstConsumer -> SecondConsumer -> ThirdConsumer.
我有以下过滤器
ConsumeFilter
public class SimpleConsumeMessageFilter<TContext, TMessage> : IFilter<TContext>
where TContext : class, ConsumeContext<TMessage>
where TMessage : class
{
public SimpleConsumeMessageFilter()
{
}
public async Task Send(TContext context, IPipe<TContext> next)
{
LogContext.PushProperty("CorrelationId", context.CorrelationId);
LogContext.PushProperty("ConversationId", context.ConversationId);
LogContext.PushProperty("InitiatorId", context.InitiatorId);
await next.Send(context);
}
public void Probe(ProbeContext context)
{
context.CreateScope("consume-filter");
}
}
public class SimpleConsumeMessagePipeSpec<TConsumer, TMessage> : IPipeSpecification<ConsumerConsumeContext<TConsumer, TMessage>>
where TConsumer : class
where TMessage : class
{
public void Apply(IPipeBuilder<ConsumerConsumeContext<TConsumer, TMessage>> builder)
{
builder.AddFilter(new SimpleConsumeMessageFilter<ConsumerConsumeContext<TConsumer, TMessage>, TMessage>());
}
public IEnumerable<ValidationResult> Validate()
{
return Enumerable.Empty<ValidationResult>();
}
}
public class SimpleConsumePipeSpecObserver : IConsumerConfigurationObserver
{
public void ConsumerConfigured<TConsumer>(IConsumerConfigurator<TConsumer> configurator)
where TConsumer : class
{
}
public void ConsumerMessageConfigured<TConsumer, TMessage>(IConsumerMessageConfigurator<TConsumer, TMessage> configurator)
where TConsumer : class
where TMessage : class
{
configurator.AddPipeSpecification(new SimpleConsumeMessagePipeSpec<TConsumer, TMessage>());
}
}
PublishFilter
public class SimplePublishMessageFilter<TMessage> : IFilter<PublishContext<TMessage>> where TMessage : class
{
public SimplePublishMessageFilter()
{
}
public async Task Send(PublishContext<TMessage> context, IPipe<PublishContext<TMessage>> next)
{
if (context.Headers.TryGetHeader("ConversationId", out object @value))
{
var conversationId = Guid.Parse(@value.ToString());
context.ConversationId = conversationId;
}
else
{
if (context.Message is BaseMessage baseEvent && !context.ConversationId.HasValue)
{
context.ConversationId = baseEvent.ConversationId ?? Guid.NewGuid();
context.Headers.Set("ConversationId", context.ConversationId.ToString());
}
}
await next.Send(context);
}
public void Probe(ProbeContext context)
{
context.CreateScope("publish-filter");
}
}
public class SimplePublishMessagePipeSpec<TMessage> : IPipeSpecification<PublishContext<TMessage>> where TMessage : class
{
public void Apply(IPipeBuilder<PublishContext<TMessage>> builder)
{
builder.AddFilter(new SimplePublishMessageFilter<TMessage>());
}
public IEnumerable<ValidationResult> Validate()
{
return Enumerable.Empty<ValidationResult>();
}
}
public class SimplePublishPipeSpecObserver : IPublishPipeSpecificationObserver
{
public void MessageSpecificationCreated<TMessage>(IMessagePublishPipeSpecification<TMessage> specification)
where TMessage : class
{
specification.AddPipeSpecification(new SimplePublishMessagePipeSpec<TMessage>());
}
}
Added to config via
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConnectConsumerConfigurationObserver(new SimpleConsumePipeSpecObserver());
cfg.ConfigurePublish(ppc =>
{
ppc.ConnectPublishPipeSpecificationObserver(new SimplePublishPipeSpecObserver());
});
cfg.UseDelayedMessageScheduler();
cfg.ConfigureEndpoints(context);
cfg.Host("localhost", rmq =>
{
rmq.Username("guest");
rmq.Password("guest");
});
});
使用上述方法,当 SecondConsumer
的过滤器为 运行.
时,'CorrelationId' header 会丢失
我尝试了以下更改,它似乎使 ID 流过消费者。
但是,采用这种方法将影响依赖于 IBusControl
接口的大部分代码/测试。我将其保留为备用选项,以防找不到任何其他解决方案。
public class FirstService
{
private readonly ILogger<FirstService> _logger;
public FirstService(ILogger<FirstService> logger)
{
_logger = logger;
}
public Task Process( ConsumeContext<FirstMessage> consumeContext)
{
var secondMessage = new SecondMessage();
consumeContext.Publish(secondMessage);
return Task.CompletedTask;
}
}
问题:有没有办法在使用IBusControl
发送/发布消息时在消费者之间共享上下文数据?
非常感谢
作为 explained in the documentation,消费者(及其依赖项)在 sending/publishing 消息时必须使用以下之一:
ConsumeContext
,通常在消费者内部
IPublishEndpoint
或 ISendEndpointProvider
,通常由消费者 scoped 依赖项使用
IBus
,不得已,因为所有上下文数据都从入站消息中丢失
关于您的最后一个问题,“有没有办法在使用 IBusControl 发送/发布消息时在消费者之间共享上下文数据?”答案是不。需要使用上下文来访问任何上下文数据。
我在带有 RabbitMQ(用于本地开发)和 SQS(用于部署)的 .NET Core 应用程序中使用 MassTransit 7.2.2,其中单个消息处理可能导致创建和处理多个新消息。
所有消息共享相同的基类型
public class BaseMessage : CorrelatedBy<Guid>
{
public BaseMessage()
{
CorrelationId = Guid.NewGuid();
CreationDate = DateTime.UtcNow;
}
public Guid CorrelationId { get; init; }
public DateTime CreationDate { get; }
public Guid? ConversationId { get; set; }
}
所有消息的基本处理流程都相同,每个消费者都有一个服务。
public class FirstMessage : BaseMessage
{
}
public class FirstConsumer : IConsumer<FirstMessage>
{
private readonly ILogger<FirstConsumer> _logger;
private readonly FirstService _service;
public FirstConsumer(ILogger<FirstConsumer> logger, FirstService service)
{
_logger = logger;
_service = service;
}
public Task Consume(ConsumeContext<FirstMessage> context)
{
_logger.LogInformation($"FirstConsumer CorrelationId: {context.CorrelationId} and ConversationId: {context.ConversationId} and InitiatorId: {context.InitiatorId}");
_service.Process(context.Message);
return Task.CompletedTask;
}
}
public class FirstService
{
private readonly IBusControl _busControl;
private readonly ILogger<FirstService> _logger;
public FirstService(IBusControl busControl, ILogger<FirstService> logger)
{
_busControl = busControl;
_logger = logger;
}
public Task Process(FirstMessage firstMessage)
{
var secondMessage = new SecondMessage();
_busControl.Publish(secondMessage);
return Task.CompletedTask;
}
}
上面的代码是一个例子,实际的代码库有 30 多个消费者,并且都有相同的模式,即每个消费者有一个服务,消息被传递给服务进行处理。
我正在尝试通过使用 ID 实施端到端跟踪消息的解决方案。
- ConversationId - 用于跟踪图表中所有消费者日志的唯一 ID
- CorrelationId - 用于在消费者中跟踪日志的唯一 ID
- InitiatorId - Parent Id
有一个消息处理图看起来像
FirstConsumer -> SecondConsumer -> ThirdConsumer.
我有以下过滤器
ConsumeFilter
public class SimpleConsumeMessageFilter<TContext, TMessage> : IFilter<TContext>
where TContext : class, ConsumeContext<TMessage>
where TMessage : class
{
public SimpleConsumeMessageFilter()
{
}
public async Task Send(TContext context, IPipe<TContext> next)
{
LogContext.PushProperty("CorrelationId", context.CorrelationId);
LogContext.PushProperty("ConversationId", context.ConversationId);
LogContext.PushProperty("InitiatorId", context.InitiatorId);
await next.Send(context);
}
public void Probe(ProbeContext context)
{
context.CreateScope("consume-filter");
}
}
public class SimpleConsumeMessagePipeSpec<TConsumer, TMessage> : IPipeSpecification<ConsumerConsumeContext<TConsumer, TMessage>>
where TConsumer : class
where TMessage : class
{
public void Apply(IPipeBuilder<ConsumerConsumeContext<TConsumer, TMessage>> builder)
{
builder.AddFilter(new SimpleConsumeMessageFilter<ConsumerConsumeContext<TConsumer, TMessage>, TMessage>());
}
public IEnumerable<ValidationResult> Validate()
{
return Enumerable.Empty<ValidationResult>();
}
}
public class SimpleConsumePipeSpecObserver : IConsumerConfigurationObserver
{
public void ConsumerConfigured<TConsumer>(IConsumerConfigurator<TConsumer> configurator)
where TConsumer : class
{
}
public void ConsumerMessageConfigured<TConsumer, TMessage>(IConsumerMessageConfigurator<TConsumer, TMessage> configurator)
where TConsumer : class
where TMessage : class
{
configurator.AddPipeSpecification(new SimpleConsumeMessagePipeSpec<TConsumer, TMessage>());
}
}
PublishFilter
public class SimplePublishMessageFilter<TMessage> : IFilter<PublishContext<TMessage>> where TMessage : class
{
public SimplePublishMessageFilter()
{
}
public async Task Send(PublishContext<TMessage> context, IPipe<PublishContext<TMessage>> next)
{
if (context.Headers.TryGetHeader("ConversationId", out object @value))
{
var conversationId = Guid.Parse(@value.ToString());
context.ConversationId = conversationId;
}
else
{
if (context.Message is BaseMessage baseEvent && !context.ConversationId.HasValue)
{
context.ConversationId = baseEvent.ConversationId ?? Guid.NewGuid();
context.Headers.Set("ConversationId", context.ConversationId.ToString());
}
}
await next.Send(context);
}
public void Probe(ProbeContext context)
{
context.CreateScope("publish-filter");
}
}
public class SimplePublishMessagePipeSpec<TMessage> : IPipeSpecification<PublishContext<TMessage>> where TMessage : class
{
public void Apply(IPipeBuilder<PublishContext<TMessage>> builder)
{
builder.AddFilter(new SimplePublishMessageFilter<TMessage>());
}
public IEnumerable<ValidationResult> Validate()
{
return Enumerable.Empty<ValidationResult>();
}
}
public class SimplePublishPipeSpecObserver : IPublishPipeSpecificationObserver
{
public void MessageSpecificationCreated<TMessage>(IMessagePublishPipeSpecification<TMessage> specification)
where TMessage : class
{
specification.AddPipeSpecification(new SimplePublishMessagePipeSpec<TMessage>());
}
}
Added to config via
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConnectConsumerConfigurationObserver(new SimpleConsumePipeSpecObserver());
cfg.ConfigurePublish(ppc =>
{
ppc.ConnectPublishPipeSpecificationObserver(new SimplePublishPipeSpecObserver());
});
cfg.UseDelayedMessageScheduler();
cfg.ConfigureEndpoints(context);
cfg.Host("localhost", rmq =>
{
rmq.Username("guest");
rmq.Password("guest");
});
});
使用上述方法,当 SecondConsumer
的过滤器为 运行.
我尝试了以下更改,它似乎使 ID 流过消费者。
但是,采用这种方法将影响依赖于 IBusControl
接口的大部分代码/测试。我将其保留为备用选项,以防找不到任何其他解决方案。
public class FirstService
{
private readonly ILogger<FirstService> _logger;
public FirstService(ILogger<FirstService> logger)
{
_logger = logger;
}
public Task Process( ConsumeContext<FirstMessage> consumeContext)
{
var secondMessage = new SecondMessage();
consumeContext.Publish(secondMessage);
return Task.CompletedTask;
}
}
问题:有没有办法在使用IBusControl
发送/发布消息时在消费者之间共享上下文数据?
非常感谢
作为 explained in the documentation,消费者(及其依赖项)在 sending/publishing 消息时必须使用以下之一:
ConsumeContext
,通常在消费者内部IPublishEndpoint
或ISendEndpointProvider
,通常由消费者 scoped 依赖项使用IBus
,不得已,因为所有上下文数据都从入站消息中丢失
关于您的最后一个问题,“有没有办法在使用 IBusControl 发送/发布消息时在消费者之间共享上下文数据?”答案是不。需要使用上下文来访问任何上下文数据。