Spring Cloud Sleuth 导致 ActiveMQ/JMS 消息 headers 丢失

Spring Coud Sleuth causing ActiveMQ/JMS message headers to be lost

版本:

我想将 Sleuth 添加到现在使用 ActiveMQ 的 pre-exisitng 项目中,之前它使用的是 JMS。当我这样做时,来自 ActiceMQ 消息的值得到 blocked/removed 和(一个是“文件名”,这是 S2 请求的键值)。其他 JMS 值似乎仍然正常。

我需要了解为什么 non-JMS 值变得 blocked/removed(我找不到任何关于导致这种情况发生的原因的信息)以防止错误。

我知道我可以使用 spring.sleuth.messaging.jms.enabled=true 为 JMS 禁用 Sleuth,但向前推进我希望能够跟踪 ActiveMQ/JMS 代码,因此该解决方法不是特别有吸引力。

由于这是 pre-exiting 代码,我还想尽可能避免 re-write 它。

有没有人让 SPring Cloud Sleuth 与 ActiveMQ/JMS 一起工作,也许可以指出哪里出了问题?

编辑:

根据 Marcin 的初始响应,我们发现以下版本可以编译和执行,但仍然存在重大问题:

没有侦探

ActiveMQ 消息详细信息:

日志消息(存在“文件名”、“CamelAwsS3Etag”等):

2021-08-16 10:10:37.889  INFO [MyApp,,] 28775 --- [umer[taskQueue]] taskQueueConsumer: ***HEADERS IN***: {CamelAwsS3ETag=39d029a87fa4c6aaee5f1de643d9f3f6, Content-Type=application/json, filename=_bl001/group0/_bl001-group0-1629104686042.zip, JMSCorrelationID=null, JMSCorrelationIDAsBytes=null, JMSDeliveryMode=2, JMSDestination=queue://taskQueue, JMSExpiration=0, JMSMessageID=ID:server-44053-1629104652998-1:3:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1629104686350, JMSType=null, JMSXGroupID=_bl001-group0, JMSXGroupSeq=0, JMSXUserID=null}

有侦探

ActiveMQ 消息详细信息:

日志消息(“文件名”、“CamelAwsS3Etag”等缺失):

2021-08-16 10:24:33.821  INFO [MyApp,,] 31553 --- [umer[taskQueue]] taskQueueConsumer: ***HEADERS IN***: {JMSCorrelationID=null, JMSCorrelationIDAsBytes=null, JMSDeliveryMode=2, JMSDestination=queue://taskQueue, JMSExpiration=0, JMSMessageID=ID:server-42561-1629105658959-1:3:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1629105675306, JMSType=null, JMSXGroupID=_bl000-group1, JMSXGroupSeq=0, JMSXUserID=null}

示例Java代码

@Component
public class MyAppCoreRouter extends RouteBuilder {

    /* Other code */

    @Override
    public void configure() {
        from("activemq:queue:taskQueue?concurrentConsumers=" + taskNumberOfConcurrentConsumers)
                .log("***HEADERS IN***: ${headers}")
                .routeId("taskQueueConsumer")
                .threads(taskNumberOfConcurrentConsumers)
                .pollEnrich().simple("aws-s3://myapp-task?amazonS3Client=#amazonS3Client&fileName=${header.filename}&operation=getObject")
                .choice()
                .when(header(S3Constants.KEY).endsWith(".zip"))
                .to("file://" + taskLocalUnCompressedEndpoint + "?fileName=${header.CamelAwsS3ETag}/${header.CamelAwsS3Key}")
                .process(exchange -> {
                    String camelAwsS3ETag = exchange.getIn().getHeader("CamelAwsS3ETag", String.class);
                    String camelAwsS3Key = exchange.getIn().getHeader("CamelAwsS3Key", String.class);
                    File uniqueDir = new File(taskLocalUnCompressedEndpoint, camelAwsS3ETag);
                    File taskZip = new File(uniqueDir, camelAwsS3Key);
                    new ZipFile(taskZip).extractAll(uniqueDir.getAbsolutePath());
                })
                .setHeader("resourceDirectory", simple(taskLocalUnCompressedEndpoint + "/${header.CamelAwsS3ETag}")).setHeader("schedule.time", simple("${date:now}"))
                .removeHeader("CamelAwsS3Headers")
                .log(LoggingLevel.DEBUG, "Processing batch job ${headers}")
                .to("spring-batch:importMyAppRecordJob")
                .endChoice()
                .otherwise()
                .log(LoggingLevel.WARN, "Unexpected file type, filtering file name ${header.CamelAwsS3Key} ")
                .end();

        /* Other code */
    }
}

您使用的 Boot、Cloud 和 Sleuth 版本有误。要使用 Sleuth 3.0.x,您需要使用 Cloud 2020.0.x 和 Boot 2.4.x 或 2.5.x.