RabbitMq - ConversationId 与 CorrelationId - 哪个更适合跟踪特定请求?
RabbitMq - ConversationId vs CorrelationId - Which is the more appropriate for tracking a specific request?
RabbitMQ 似乎有两个非常相似的属性,我不完全理解其中的区别。 ConversationId
和 CorrelationId
.
我的用例如下。我有一个生成 Guid
的网站。该网站调用 API,将该唯一标识符添加到 HttpRequest
headers。这又会向 RabbitMQ 发布一条消息。该消息由第一个消费者处理并在别处传递给另一个消费者,依此类推。
出于记录目的,我想记录一个标识符,该标识符将初始请求与所有后续操作联系在一起。这对于整个应用程序不同部分的旅程应该是唯一的。因此。当记录到类似 Serilog/ElasticSearch 的内容时,这就很容易看出哪个请求触发了初始请求,并且整个应用程序中该请求的所有日志条目都可以关联在一起。
我创建了一个提供程序,它查看传入的 HttpRequest
以获取标识符。我将其命名为 "CorrelationId",但我开始怀疑是否真的应该将其命名为 "ConversationId"。就 RabbitMQ 而言,"ConversationId" 的想法更适合这个模型,还是 "CorrelationId" 更好?
这两个概念有什么区别?
就代码而言,我希望执行以下操作。首先在我的 API 中注册总线并配置 SendPublish
以使用提供商的 CorrelationId
。
// bus registration in the API
var busSettings = context.Resolve<BusSettings>();
// using AspNetCoreCorrelationIdProvider
var correlationIdProvider = context.Resolve<ICorrelationIdProvider>();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(
new Uri(busSettings.HostAddress),
h =>
{
h.Username(busSettings.Username);
h.Password(busSettings.Password);
});
cfg.ConfigurePublish(x => x.UseSendExecute(sendContext =>
{
// which one is more appropriate
//sendContext.ConversationId = correlationIdProvider.GetCorrelationId();
sendContext.CorrelationId = correlationIdProvider.GetCorrelationId();
}));
});
作为参考,这是我的简单提供者界面
// define the interface
public interface ICorrelationIdProvider
{
Guid GetCorrelationId();
}
以及 AspNetCore 实现,它提取调用客户端(即网站)设置的唯一 ID。
public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider
{
private IHttpContextAccessor _httpContextAccessor;
public AspNetCoreCorrelationIdProvider(IHttpContextAccessor httpContextAccessor)
{
_httpContextAccessor = httpContextAccessor;
}
public Guid GetCorrelationId()
{
if (_httpContextAccessor.HttpContext.Request.Headers.TryGetValue("correlation-Id", out StringValues headers))
{
var header = headers.FirstOrDefault();
if (Guid.TryParse(header, out Guid headerCorrelationId))
{
return headerCorrelationId;
}
}
return Guid.NewGuid();
}
}
最后,我的服务主机是简单的 windows 服务应用程序,它们坐下并使用已发布的消息。他们使用以下内容来获取 CorrelationId,并且很可能会发布给其他消费者以及其他服务主机。
public class MessageContextCorrelationIdProvider : ICorrelationIdProvider
{
/// <summary>
/// The consume context
/// </summary>
private readonly ConsumeContext _consumeContext;
/// <summary>
/// Initializes a new instance of the <see cref="MessageContextCorrelationIdProvider"/> class.
/// </summary>
/// <param name="consumeContext">The consume context.</param>
public MessageContextCorrelationIdProvider(ConsumeContext consumeContext)
{
_consumeContext = consumeContext;
}
/// <summary>
/// Gets the correlation identifier.
/// </summary>
/// <returns></returns>
public Guid GetCorrelationId()
{
// correlationid or conversationIs?
if (_consumeContext.CorrelationId.HasValue && _consumeContext.CorrelationId != Guid.Empty)
{
return _consumeContext.CorrelationId.Value;
}
return Guid.NewGuid();
}
}
然后我的消费者中有一个记录器,它使用该提供者提取 CorrelationId
:
public async Task Consume(ConsumeContext<IMyEvent> context)
{
var correlationId = _correlationProvider.GetCorrelationId();
_logger.Info(correlationId, $"#### IMyEvent received for customer:{context.Message.CustomerId}");
try
{
await _mediator.Send(new SomeOtherRequest(correlationId) { SomeObject: context.Message.SomeObject });
}
catch (Exception e)
{
_logger.Exception(e, correlationId, $"Exception:{e}");
throw;
}
_logger.Info(correlationId, $"Finished processing: {DateTime.Now}");
}
阅读 docs,它说了以下关于 "ConversationId" 的内容:
The conversation is created by the first message that is sent or
published, in which no existing context is available (such as when a
message is sent or published by using IBus.Send or IBus.Publish). If
an existing context is used to send or publish a message, the
ConversationId is copied to the new message, ensuring that a set of
messages within the same conversation have the same identifier.
现在我开始认为我的术语混淆了,从技术上讲,这是一段对话(尽管 'conversation' 就像 'the telephone game')。
那么,在这个用例中是 CorrelationId
,还是 ConversationId
?请帮助我正确使用术语!!
在消息对话中(提示不祥的乐谱),可以有一条消息(我告诉你做某事,或者我告诉所有正在收听的人发生了什么事)或多条消息(我告诉你做某事某事,然后你告诉了其他人,或者我告诉所有正在收听的人发生了某事,然后那些听众告诉了他们的朋友,依此类推。
使用 MassTransit,从第一条消息到最后一条消息,如果使用得当,这些消息中的每一条都会有相同的 ConversationId
。 MassTransit 在消息消费期间将 属性 从 ConsumeContext
未修改地复制到每个传出消息。这使得所有内容都成为同一 trace - 对话的一部分。
但是,MassTransit 默认不设置 CorrelationId。如果消息 属性 被命名为 CorrelationId(或 CommandId 或 EventId),它可以自动设置,或者您也可以添加自己的名称。
如果 CorrelationId 存在于消费消息中,任何传出消息都会将该 CorrelationId 属性 复制到 InitiatorId 属性(因果关系——消费消息启动了创建后续消息)。这形成了一个链(或跨度,在跟踪术语中),可以跟踪它以显示从初始消息开始的消息传播。
CorrelationId 应被视为命令或事件的标识符,以便可以在整个系统日志中看到该命令的效果。
在我看来,您来自 HTTP 的输入可能是 Initiator,因此将该标识符复制到 InitiatorId 并为消息创建一个新的 CorrelationId,或者您可能只想对初始 CorrelationId 使用相同的标识符,让后续消息以它为发起者。
RabbitMQ 似乎有两个非常相似的属性,我不完全理解其中的区别。 ConversationId
和 CorrelationId
.
我的用例如下。我有一个生成 Guid
的网站。该网站调用 API,将该唯一标识符添加到 HttpRequest
headers。这又会向 RabbitMQ 发布一条消息。该消息由第一个消费者处理并在别处传递给另一个消费者,依此类推。
出于记录目的,我想记录一个标识符,该标识符将初始请求与所有后续操作联系在一起。这对于整个应用程序不同部分的旅程应该是唯一的。因此。当记录到类似 Serilog/ElasticSearch 的内容时,这就很容易看出哪个请求触发了初始请求,并且整个应用程序中该请求的所有日志条目都可以关联在一起。
我创建了一个提供程序,它查看传入的 HttpRequest
以获取标识符。我将其命名为 "CorrelationId",但我开始怀疑是否真的应该将其命名为 "ConversationId"。就 RabbitMQ 而言,"ConversationId" 的想法更适合这个模型,还是 "CorrelationId" 更好?
这两个概念有什么区别?
就代码而言,我希望执行以下操作。首先在我的 API 中注册总线并配置 SendPublish
以使用提供商的 CorrelationId
。
// bus registration in the API
var busSettings = context.Resolve<BusSettings>();
// using AspNetCoreCorrelationIdProvider
var correlationIdProvider = context.Resolve<ICorrelationIdProvider>();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(
new Uri(busSettings.HostAddress),
h =>
{
h.Username(busSettings.Username);
h.Password(busSettings.Password);
});
cfg.ConfigurePublish(x => x.UseSendExecute(sendContext =>
{
// which one is more appropriate
//sendContext.ConversationId = correlationIdProvider.GetCorrelationId();
sendContext.CorrelationId = correlationIdProvider.GetCorrelationId();
}));
});
作为参考,这是我的简单提供者界面
// define the interface
public interface ICorrelationIdProvider
{
Guid GetCorrelationId();
}
以及 AspNetCore 实现,它提取调用客户端(即网站)设置的唯一 ID。
public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider
{
private IHttpContextAccessor _httpContextAccessor;
public AspNetCoreCorrelationIdProvider(IHttpContextAccessor httpContextAccessor)
{
_httpContextAccessor = httpContextAccessor;
}
public Guid GetCorrelationId()
{
if (_httpContextAccessor.HttpContext.Request.Headers.TryGetValue("correlation-Id", out StringValues headers))
{
var header = headers.FirstOrDefault();
if (Guid.TryParse(header, out Guid headerCorrelationId))
{
return headerCorrelationId;
}
}
return Guid.NewGuid();
}
}
最后,我的服务主机是简单的 windows 服务应用程序,它们坐下并使用已发布的消息。他们使用以下内容来获取 CorrelationId,并且很可能会发布给其他消费者以及其他服务主机。
public class MessageContextCorrelationIdProvider : ICorrelationIdProvider
{
/// <summary>
/// The consume context
/// </summary>
private readonly ConsumeContext _consumeContext;
/// <summary>
/// Initializes a new instance of the <see cref="MessageContextCorrelationIdProvider"/> class.
/// </summary>
/// <param name="consumeContext">The consume context.</param>
public MessageContextCorrelationIdProvider(ConsumeContext consumeContext)
{
_consumeContext = consumeContext;
}
/// <summary>
/// Gets the correlation identifier.
/// </summary>
/// <returns></returns>
public Guid GetCorrelationId()
{
// correlationid or conversationIs?
if (_consumeContext.CorrelationId.HasValue && _consumeContext.CorrelationId != Guid.Empty)
{
return _consumeContext.CorrelationId.Value;
}
return Guid.NewGuid();
}
}
然后我的消费者中有一个记录器,它使用该提供者提取 CorrelationId
:
public async Task Consume(ConsumeContext<IMyEvent> context)
{
var correlationId = _correlationProvider.GetCorrelationId();
_logger.Info(correlationId, $"#### IMyEvent received for customer:{context.Message.CustomerId}");
try
{
await _mediator.Send(new SomeOtherRequest(correlationId) { SomeObject: context.Message.SomeObject });
}
catch (Exception e)
{
_logger.Exception(e, correlationId, $"Exception:{e}");
throw;
}
_logger.Info(correlationId, $"Finished processing: {DateTime.Now}");
}
阅读 docs,它说了以下关于 "ConversationId" 的内容:
The conversation is created by the first message that is sent or published, in which no existing context is available (such as when a message is sent or published by using IBus.Send or IBus.Publish). If an existing context is used to send or publish a message, the ConversationId is copied to the new message, ensuring that a set of messages within the same conversation have the same identifier.
现在我开始认为我的术语混淆了,从技术上讲,这是一段对话(尽管 'conversation' 就像 'the telephone game')。
那么,在这个用例中是 CorrelationId
,还是 ConversationId
?请帮助我正确使用术语!!
在消息对话中(提示不祥的乐谱),可以有一条消息(我告诉你做某事,或者我告诉所有正在收听的人发生了什么事)或多条消息(我告诉你做某事某事,然后你告诉了其他人,或者我告诉所有正在收听的人发生了某事,然后那些听众告诉了他们的朋友,依此类推。
使用 MassTransit,从第一条消息到最后一条消息,如果使用得当,这些消息中的每一条都会有相同的 ConversationId
。 MassTransit 在消息消费期间将 属性 从 ConsumeContext
未修改地复制到每个传出消息。这使得所有内容都成为同一 trace - 对话的一部分。
但是,MassTransit 默认不设置 CorrelationId。如果消息 属性 被命名为 CorrelationId(或 CommandId 或 EventId),它可以自动设置,或者您也可以添加自己的名称。
如果 CorrelationId 存在于消费消息中,任何传出消息都会将该 CorrelationId 属性 复制到 InitiatorId 属性(因果关系——消费消息启动了创建后续消息)。这形成了一个链(或跨度,在跟踪术语中),可以跟踪它以显示从初始消息开始的消息传播。
CorrelationId 应被视为命令或事件的标识符,以便可以在整个系统日志中看到该命令的效果。
在我看来,您来自 HTTP 的输入可能是 Initiator,因此将该标识符复制到 InitiatorId 并为消息创建一个新的 CorrelationId,或者您可能只想对初始 CorrelationId 使用相同的标识符,让后续消息以它为发起者。