通过RabbitMQ维护correlationId
Maintain correlationId through RabbitMQ
我一直在考虑使用 RabbitMQ 进行跨服务消息传递。我已经能够使用 Spring 注释配置我们的交换/队列/DLX 等。示例(简单)队列侦听器:
@RabbitListener(queues = RabbitMessageType.QueueNames.SMS_NOTIFICATIONS)
public void receive1(Message message) throws Exception {
RabbitMessageDto messageDto = OBJECT_MAPPER.readValue(message.getBody(), RabbitMessageDto.class);
SmsNotificationDto payload = OBJECT_MAPPER.readValue(messageDto.getPayload(), SmsNotificationDto.class);
log.info(payload.getMessage());
}
我正在使用 spring-cloud-sleuth 生成 correlationIds / traceIds,它们在使用 HTTP 请求与其他服务对话时会被保留,使我们能够在我们各种服务的日志中跟踪给定的 ID微服务。
虽然我可以获得当前的 traceId 并将其插入到我的 DTO 中:
@Autowired
private Tracer tracer;
private RabbitMessageDto createRabbitMessageWithPayload(String messageType,
String messageVersion,
Object payload) {
return new RabbitMessageDto.Builder()
.withTraceId(tracer.getCurrentSpan().getTraceId())
.withDtoName(messageType)
.withDtoVersion(messageVersion)
.withPayload(payload)
.build();
}
我找不到在接收方法中设置traceId的方法。
Google 不断将我带到 spring-cloud-stream 和 spring-cloud-stream-starter-rabbit;文档似乎表明可以自动插入/设置 traceIds,但我根本不熟悉 spring-cloud-stream,也没有发现文档特别有用。
所以,我希望得到以下问题的答案:
- 使用 SpanAdjuster 或 Tracer 等;我可以根据 DTO 中的值设置 traceId 吗?
- 使用 spring-cloud-stream,我可以自动插入/检索 traceId,我应该从哪里开始?
因此,如果有人遇到这种情况,希望设置 sleuth traceId 上下文,我们提出了以下解决方案:
@Autowired Tracer tracer;
private void someMethod(long traceId) {
Span span = Span.builder()
.traceId(traceId)
.spanId(new Random().nextLong())
.build();
tracer.continueSpan(span);
// do work
tracer.closeSpan(span);
}
应该注意的是,所有文档都说跨度应该在您完成后关闭。上面的 do work 部分应该用 try / catch / finally 块包裹以确保它被关闭。
在 span 仍然打开的情况下调用的任何方法都将继承 traceId。
编辑
我还要说,似乎更好的解决方案是用 spring-cloud-stream 替换 Spring AMQP 库;据我所知,这应该会自动将 traceId 包含在兔子消息 (correlationId) 中,并将其设置在另一端。但是,我还没有机会对此进行测试。
我一直在考虑使用 RabbitMQ 进行跨服务消息传递。我已经能够使用 Spring 注释配置我们的交换/队列/DLX 等。示例(简单)队列侦听器:
@RabbitListener(queues = RabbitMessageType.QueueNames.SMS_NOTIFICATIONS)
public void receive1(Message message) throws Exception {
RabbitMessageDto messageDto = OBJECT_MAPPER.readValue(message.getBody(), RabbitMessageDto.class);
SmsNotificationDto payload = OBJECT_MAPPER.readValue(messageDto.getPayload(), SmsNotificationDto.class);
log.info(payload.getMessage());
}
我正在使用 spring-cloud-sleuth 生成 correlationIds / traceIds,它们在使用 HTTP 请求与其他服务对话时会被保留,使我们能够在我们各种服务的日志中跟踪给定的 ID微服务。
虽然我可以获得当前的 traceId 并将其插入到我的 DTO 中:
@Autowired
private Tracer tracer;
private RabbitMessageDto createRabbitMessageWithPayload(String messageType,
String messageVersion,
Object payload) {
return new RabbitMessageDto.Builder()
.withTraceId(tracer.getCurrentSpan().getTraceId())
.withDtoName(messageType)
.withDtoVersion(messageVersion)
.withPayload(payload)
.build();
}
我找不到在接收方法中设置traceId的方法。
Google 不断将我带到 spring-cloud-stream 和 spring-cloud-stream-starter-rabbit;文档似乎表明可以自动插入/设置 traceIds,但我根本不熟悉 spring-cloud-stream,也没有发现文档特别有用。
所以,我希望得到以下问题的答案:
- 使用 SpanAdjuster 或 Tracer 等;我可以根据 DTO 中的值设置 traceId 吗?
- 使用 spring-cloud-stream,我可以自动插入/检索 traceId,我应该从哪里开始?
因此,如果有人遇到这种情况,希望设置 sleuth traceId 上下文,我们提出了以下解决方案:
@Autowired Tracer tracer;
private void someMethod(long traceId) {
Span span = Span.builder()
.traceId(traceId)
.spanId(new Random().nextLong())
.build();
tracer.continueSpan(span);
// do work
tracer.closeSpan(span);
}
应该注意的是,所有文档都说跨度应该在您完成后关闭。上面的 do work 部分应该用 try / catch / finally 块包裹以确保它被关闭。
在 span 仍然打开的情况下调用的任何方法都将继承 traceId。
编辑
我还要说,似乎更好的解决方案是用 spring-cloud-stream 替换 Spring AMQP 库;据我所知,这应该会自动将 traceId 包含在兔子消息 (correlationId) 中,并将其设置在另一端。但是,我还没有机会对此进行测试。