Spring Cloud Sleuth 导致 ActiveMQ/JMS 消息 headers 丢失
Spring Coud Sleuth causing ActiveMQ/JMS message headers to be lost
版本:
- SpringBoot: 2.3.12.RELEASE
- SpringCloud:Hoxton.SR12
- SpringCloud 入门侦探:3.0.3
- 骆驼:3.4.6
我想将 Sleuth 添加到现在使用 ActiveMQ 的 pre-exisitng 项目中,之前它使用的是 JMS。当我这样做时,来自 ActiceMQ 消息的值得到 blocked/removed 和(一个是“文件名”,这是 S2 请求的键值)。其他 JMS 值似乎仍然正常。
我需要了解为什么 non-JMS 值变得 blocked/removed(我找不到任何关于导致这种情况发生的原因的信息)以防止错误。
我知道我可以使用 spring.sleuth.messaging.jms.enabled=true
为 JMS 禁用 Sleuth,但向前推进我希望能够跟踪 ActiveMQ/JMS 代码,因此该解决方法不是特别有吸引力。
由于这是 pre-exiting 代码,我还想尽可能避免 re-write 它。
有没有人让 SPring Cloud Sleuth 与 ActiveMQ/JMS 一起工作,也许可以指出哪里出了问题?
编辑:
根据 Marcin 的初始响应,我们发现以下版本可以编译和执行,但仍然存在重大问题:
- SpringBoot: 2.4.8
- 春云:2020.0.3
- 骆驼:3.7.5
没有侦探
ActiveMQ 消息详细信息:
日志消息(存在“文件名”、“CamelAwsS3Etag”等):
2021-08-16 10:10:37.889 INFO [MyApp,,] 28775 --- [umer[taskQueue]] taskQueueConsumer: ***HEADERS IN***: {CamelAwsS3ETag=39d029a87fa4c6aaee5f1de643d9f3f6, Content-Type=application/json, filename=_bl001/group0/_bl001-group0-1629104686042.zip, JMSCorrelationID=null, JMSCorrelationIDAsBytes=null, JMSDeliveryMode=2, JMSDestination=queue://taskQueue, JMSExpiration=0, JMSMessageID=ID:server-44053-1629104652998-1:3:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1629104686350, JMSType=null, JMSXGroupID=_bl001-group0, JMSXGroupSeq=0, JMSXUserID=null}
有侦探
ActiveMQ 消息详细信息:
日志消息(“文件名”、“CamelAwsS3Etag”等缺失):
2021-08-16 10:24:33.821 INFO [MyApp,,] 31553 --- [umer[taskQueue]] taskQueueConsumer: ***HEADERS IN***: {JMSCorrelationID=null, JMSCorrelationIDAsBytes=null, JMSDeliveryMode=2, JMSDestination=queue://taskQueue, JMSExpiration=0, JMSMessageID=ID:server-42561-1629105658959-1:3:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1629105675306, JMSType=null, JMSXGroupID=_bl000-group1, JMSXGroupSeq=0, JMSXUserID=null}
示例Java代码
@Component
public class MyAppCoreRouter extends RouteBuilder {
/* Other code */
@Override
public void configure() {
from("activemq:queue:taskQueue?concurrentConsumers=" + taskNumberOfConcurrentConsumers)
.log("***HEADERS IN***: ${headers}")
.routeId("taskQueueConsumer")
.threads(taskNumberOfConcurrentConsumers)
.pollEnrich().simple("aws-s3://myapp-task?amazonS3Client=#amazonS3Client&fileName=${header.filename}&operation=getObject")
.choice()
.when(header(S3Constants.KEY).endsWith(".zip"))
.to("file://" + taskLocalUnCompressedEndpoint + "?fileName=${header.CamelAwsS3ETag}/${header.CamelAwsS3Key}")
.process(exchange -> {
String camelAwsS3ETag = exchange.getIn().getHeader("CamelAwsS3ETag", String.class);
String camelAwsS3Key = exchange.getIn().getHeader("CamelAwsS3Key", String.class);
File uniqueDir = new File(taskLocalUnCompressedEndpoint, camelAwsS3ETag);
File taskZip = new File(uniqueDir, camelAwsS3Key);
new ZipFile(taskZip).extractAll(uniqueDir.getAbsolutePath());
})
.setHeader("resourceDirectory", simple(taskLocalUnCompressedEndpoint + "/${header.CamelAwsS3ETag}")).setHeader("schedule.time", simple("${date:now}"))
.removeHeader("CamelAwsS3Headers")
.log(LoggingLevel.DEBUG, "Processing batch job ${headers}")
.to("spring-batch:importMyAppRecordJob")
.endChoice()
.otherwise()
.log(LoggingLevel.WARN, "Unexpected file type, filtering file name ${header.CamelAwsS3Key} ")
.end();
/* Other code */
}
}
您使用的 Boot、Cloud 和 Sleuth 版本有误。要使用 Sleuth 3.0.x,您需要使用 Cloud 2020.0.x 和 Boot 2.4.x 或 2.5.x.
版本:
- SpringBoot: 2.3.12.RELEASE
- SpringCloud:Hoxton.SR12
- SpringCloud 入门侦探:3.0.3
- 骆驼:3.4.6
我想将 Sleuth 添加到现在使用 ActiveMQ 的 pre-exisitng 项目中,之前它使用的是 JMS。当我这样做时,来自 ActiceMQ 消息的值得到 blocked/removed 和(一个是“文件名”,这是 S2 请求的键值)。其他 JMS 值似乎仍然正常。
我需要了解为什么 non-JMS 值变得 blocked/removed(我找不到任何关于导致这种情况发生的原因的信息)以防止错误。
我知道我可以使用 spring.sleuth.messaging.jms.enabled=true
为 JMS 禁用 Sleuth,但向前推进我希望能够跟踪 ActiveMQ/JMS 代码,因此该解决方法不是特别有吸引力。
由于这是 pre-exiting 代码,我还想尽可能避免 re-write 它。
有没有人让 SPring Cloud Sleuth 与 ActiveMQ/JMS 一起工作,也许可以指出哪里出了问题?
编辑:
根据 Marcin 的初始响应,我们发现以下版本可以编译和执行,但仍然存在重大问题:
- SpringBoot: 2.4.8
- 春云:2020.0.3
- 骆驼:3.7.5
没有侦探
ActiveMQ 消息详细信息:
日志消息(存在“文件名”、“CamelAwsS3Etag”等):
2021-08-16 10:10:37.889 INFO [MyApp,,] 28775 --- [umer[taskQueue]] taskQueueConsumer: ***HEADERS IN***: {CamelAwsS3ETag=39d029a87fa4c6aaee5f1de643d9f3f6, Content-Type=application/json, filename=_bl001/group0/_bl001-group0-1629104686042.zip, JMSCorrelationID=null, JMSCorrelationIDAsBytes=null, JMSDeliveryMode=2, JMSDestination=queue://taskQueue, JMSExpiration=0, JMSMessageID=ID:server-44053-1629104652998-1:3:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1629104686350, JMSType=null, JMSXGroupID=_bl001-group0, JMSXGroupSeq=0, JMSXUserID=null}
有侦探
ActiveMQ 消息详细信息:
日志消息(“文件名”、“CamelAwsS3Etag”等缺失):
2021-08-16 10:24:33.821 INFO [MyApp,,] 31553 --- [umer[taskQueue]] taskQueueConsumer: ***HEADERS IN***: {JMSCorrelationID=null, JMSCorrelationIDAsBytes=null, JMSDeliveryMode=2, JMSDestination=queue://taskQueue, JMSExpiration=0, JMSMessageID=ID:server-42561-1629105658959-1:3:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1629105675306, JMSType=null, JMSXGroupID=_bl000-group1, JMSXGroupSeq=0, JMSXUserID=null}
示例Java代码
@Component
public class MyAppCoreRouter extends RouteBuilder {
/* Other code */
@Override
public void configure() {
from("activemq:queue:taskQueue?concurrentConsumers=" + taskNumberOfConcurrentConsumers)
.log("***HEADERS IN***: ${headers}")
.routeId("taskQueueConsumer")
.threads(taskNumberOfConcurrentConsumers)
.pollEnrich().simple("aws-s3://myapp-task?amazonS3Client=#amazonS3Client&fileName=${header.filename}&operation=getObject")
.choice()
.when(header(S3Constants.KEY).endsWith(".zip"))
.to("file://" + taskLocalUnCompressedEndpoint + "?fileName=${header.CamelAwsS3ETag}/${header.CamelAwsS3Key}")
.process(exchange -> {
String camelAwsS3ETag = exchange.getIn().getHeader("CamelAwsS3ETag", String.class);
String camelAwsS3Key = exchange.getIn().getHeader("CamelAwsS3Key", String.class);
File uniqueDir = new File(taskLocalUnCompressedEndpoint, camelAwsS3ETag);
File taskZip = new File(uniqueDir, camelAwsS3Key);
new ZipFile(taskZip).extractAll(uniqueDir.getAbsolutePath());
})
.setHeader("resourceDirectory", simple(taskLocalUnCompressedEndpoint + "/${header.CamelAwsS3ETag}")).setHeader("schedule.time", simple("${date:now}"))
.removeHeader("CamelAwsS3Headers")
.log(LoggingLevel.DEBUG, "Processing batch job ${headers}")
.to("spring-batch:importMyAppRecordJob")
.endChoice()
.otherwise()
.log(LoggingLevel.WARN, "Unexpected file type, filtering file name ${header.CamelAwsS3Key} ")
.end();
/* Other code */
}
}
您使用的 Boot、Cloud 和 Sleuth 版本有误。要使用 Sleuth 3.0.x,您需要使用 Cloud 2020.0.x 和 Boot 2.4.x 或 2.5.x.