当 Spring Sleuth 在类路径中时,为什么跟踪信息不通过 kafka 消息传播?
Why tracing information do not propagate over kafka messages when Spring Sleuth is in the classpath?
由于未触发方法 SleuthKafkaAspect.wrapProducerFactory(),因此跟踪信息不会通过 kafka 消息传播。
在生产者端,正确发送消息并正确记录跟踪信息。在消费者端,而是创建了一个新的 traceId 和 spanId。
以下两行日志记录显示了不同的 traceId、spanId(和 parentId)值:
2021-03-23 11:42:30.158 [http-nio-9185-exec-2] INFO my.company.Producer - /4afe07273872918b/4afe07273872918b// - Sending event='MyEvent'
2021-03-23 11:42:54.374 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO my.company.Consumer /1fec3bf6a3c91773/ff4bd26b2e509ed8/1fec3bf6a3c91773/ - Received new event='MyEvent'
首先,使用 Krafdrop 并进行调试,我验证消息 header 不包含任何跟踪信息。
在那之后,我发现永远不会触发方法 SleuthKafkaAspect.wrapProducerFactory(),而是在消费者端触发方法 SleuthKafkaAspect.anyConsumerFactory()。
使用的库版本如下:
- spring 开机:2.3.7.RELEASE
- spring 云 bom:Hoxton.SR10
- spring 云:2.2.7.RELEASE(和 2.2.5.RELEASE)
- spring卡夫卡:2.5.10.RELEASE
- kakfa 客户端:2.4.1
- spring-cloud-starter-sleuth: 2.2.7.RELEASE
- spring-cloud-sleuth-zipkin:2.2.7.RELEASE
kakfa 客户端库版本为 2.4.1 是由于与 2.5.1 版 kafka 客户端上的生产错误相关的版本降级导致 cpu 使用率增加。
我还尝试使用以下库版本组合但没有成功:
- spring boot: 2.3.7.RELEASE
- spring cloud bom: Hoxton.SR10 (and Hoxton.SR8)
- spring cloud: 2.2.7.RELEASE (and 2.2.5.RELEASE)
- spring kafka: 2.5.10.RELEASE
- kakfa client: 2.5.1
- spring-cloud-starter-sleuth: 2.2.7.RELEASE (and 2.2.5.RELEASE)
- spring-cloud-sleuth-zipkin:2.2.7.RELEASE (and 2.2.5.RELEASE)
- spring boot: 2.3.7.RELEASE
- spring cloud bom: Hoxton.SR10 (and Hoxton.SR8)
- spring cloud: 2.2.7.RELEASE (and 2.2.5.RELEASE)
- spring kafka: 2.5.10.RELEASE
- kakfa client: 2.6.0
- spring-cloud-starter-sleuth: 2.2.7.RELEASE (and 2.2.5.RELEASE)
- spring-cloud-sleuth-zipkin:2.2.7.RELEASE (and 2.2.5.RELEASE)
- spring boot: 2.3.7.RELEASE
- spring cloud bom: Hoxton.SR10 (and Hoxton.SR8)
- spring cloud: 2.2.7.RELEASE (and 2.2.5.RELEASE)
- spring kafka: 2.6.x
- kakfa client: 2.6.0
- spring-cloud-starter-sleuth: 2.2.7.RELEASE (and 2.2.5.RELEASE)
- spring-cloud-sleuth-zipkin:2.2.7.RELEASE (and 2.2.5.RELEASE)
我们将项目迁移到不同的 spring 引导版本,从 2.3.0.RELEASE 到 2.3.7.RELEASE。在一切正常工作之前。
低于旧库版本:
- spring-boot: 2.3.0.RELEASE
- spring-kafka: 2.5.0.RELEASE
- kafka-clients: 2.4.1
- spring-cloud: 2.2.5.RELEASE
- spring-cloud-starter-sleuth: 2.2.5.RELEASE
- spring-cloud-sleuth-zipkin:2.2.5.RELEASE
我们还介绍了一个log42/log4j(之前是带logback的slf4j)
相关库下方:
- org.springframework.boot:spring-boot-starter-log4j2:jar:2.3.7.RELEASE:compile
- org.slf4j:jul-to-slf4j:jar:1.7.30:compile
- io.projectreactor:reactor-test:jar:3.3.12.RELEASE:test
- io.projectreactor:reactor-core:jar:3.3.12.RELEASE:test
- org.reactivestreams:reactive-streams:jar:1.0.3:test
配置的属性如下:
spring.sleuth.messaging.enabled=true
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.client-id=myClientIdentifier
spring.kafka.consumer.group-id=MyConsumerGroup
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
创建 ProducerFactory 的配置 class 如下:
@Configuration
@EnableTransactionManagement
public class KafkaProducerConfig {
KafkaProperties kafkaProperties;
@Autowired
public KafkaProducerConfig(
KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
private ProducerFactory<String, Object> producerFactory() {
DefaultKafkaProducerFactory<String, Object> defaultKafkaProducerFactory =
new DefaultKafkaProducerFactory<>(producerConfigs());
//defaultKafkaProducerFactory.transactionCapable();
//defaultKafkaProducerFactory.setTransactionIdPrefix("tx-");
return defaultKafkaProducerFactory;
}
private Map<String, Object> producerConfigs() {
Map<String, Object> configs = kafkaProperties.buildProducerProperties();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return configs;
}
}
我的 spring 启动应用程序 class:
@Profile("DEV")
@SpringBootApplication(
scanBasePackages = {"my.company"},
exclude = {
DataSourceAutoConfiguration.class,
DataSourceTransactionManagerAutoConfiguration.class,
HibernateJpaAutoConfiguration.class
}
)
@EnableSwagger2
@EnableFeignClients(basePackages = {"my.company.common", "my.company.integration"})
@EnableTransactionManagement
@EnableMongoRepositories(basePackages = {
"my.company.repository"})
@EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING)
@ServletComponentScan
public class DevAppStartup extends SpringBootServletInitializer {
public static void main(String[] args) {
SpringApplication.run(DevAppStartup.class, args);
}
}
在这里您可以找到命令“mvn dependency:tree”的输出
mvn_dependency_tree.txt
As the documentation suggests,如果要使用自己的KafkaTemplate
:
,需要创建一个ProducerFactory
bean
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object>producerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object>producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
根据 Spring Sleuth 的文档:https://docs.spring.io/spring-cloud-sleuth/docs/current-SNAPSHOT/reference/html/integrations.html#sleuth-kafka-integration
We decorate the Kafka clients (KafkaProducer and KafkaConsumer) to create a span for each event that is produced or consumed. You can disable this feature by setting the value of spring.sleuth.kafka.enabled to false.
You have to register the Producer or Consumer as beans in order for Sleuth’s auto-configuration to decorate them. When you then inject the beans, the expected type must be Producer or Consumer (and NOT e.g. KafkaProducer).
由于未触发方法 SleuthKafkaAspect.wrapProducerFactory(),因此跟踪信息不会通过 kafka 消息传播。 在生产者端,正确发送消息并正确记录跟踪信息。在消费者端,而是创建了一个新的 traceId 和 spanId。
以下两行日志记录显示了不同的 traceId、spanId(和 parentId)值:
2021-03-23 11:42:30.158 [http-nio-9185-exec-2] INFO my.company.Producer - /4afe07273872918b/4afe07273872918b// - Sending event='MyEvent'
2021-03-23 11:42:54.374 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO my.company.Consumer /1fec3bf6a3c91773/ff4bd26b2e509ed8/1fec3bf6a3c91773/ - Received new event='MyEvent'
首先,使用 Krafdrop 并进行调试,我验证消息 header 不包含任何跟踪信息。
在那之后,我发现永远不会触发方法 SleuthKafkaAspect.wrapProducerFactory(),而是在消费者端触发方法 SleuthKafkaAspect.anyConsumerFactory()。
使用的库版本如下:
- spring 开机:2.3.7.RELEASE
- spring 云 bom:Hoxton.SR10
- spring 云:2.2.7.RELEASE(和 2.2.5.RELEASE)
- spring卡夫卡:2.5.10.RELEASE
- kakfa 客户端:2.4.1
- spring-cloud-starter-sleuth: 2.2.7.RELEASE
- spring-cloud-sleuth-zipkin:2.2.7.RELEASE
kakfa 客户端库版本为 2.4.1 是由于与 2.5.1 版 kafka 客户端上的生产错误相关的版本降级导致 cpu 使用率增加。 我还尝试使用以下库版本组合但没有成功:
- spring boot: 2.3.7.RELEASE
- spring cloud bom: Hoxton.SR10 (and Hoxton.SR8)
- spring cloud: 2.2.7.RELEASE (and 2.2.5.RELEASE)
- spring kafka: 2.5.10.RELEASE
- kakfa client: 2.5.1
- spring-cloud-starter-sleuth: 2.2.7.RELEASE (and 2.2.5.RELEASE)
- spring-cloud-sleuth-zipkin:2.2.7.RELEASE (and 2.2.5.RELEASE)
- spring boot: 2.3.7.RELEASE
- spring cloud bom: Hoxton.SR10 (and Hoxton.SR8)
- spring cloud: 2.2.7.RELEASE (and 2.2.5.RELEASE)
- spring kafka: 2.5.10.RELEASE
- kakfa client: 2.6.0
- spring-cloud-starter-sleuth: 2.2.7.RELEASE (and 2.2.5.RELEASE)
- spring-cloud-sleuth-zipkin:2.2.7.RELEASE (and 2.2.5.RELEASE)
- spring boot: 2.3.7.RELEASE
- spring cloud bom: Hoxton.SR10 (and Hoxton.SR8)
- spring cloud: 2.2.7.RELEASE (and 2.2.5.RELEASE)
- spring kafka: 2.6.x
- kakfa client: 2.6.0
- spring-cloud-starter-sleuth: 2.2.7.RELEASE (and 2.2.5.RELEASE)
- spring-cloud-sleuth-zipkin:2.2.7.RELEASE (and 2.2.5.RELEASE)
我们将项目迁移到不同的 spring 引导版本,从 2.3.0.RELEASE 到 2.3.7.RELEASE。在一切正常工作之前。 低于旧库版本:
- spring-boot: 2.3.0.RELEASE
- spring-kafka: 2.5.0.RELEASE
- kafka-clients: 2.4.1
- spring-cloud: 2.2.5.RELEASE
- spring-cloud-starter-sleuth: 2.2.5.RELEASE
- spring-cloud-sleuth-zipkin:2.2.5.RELEASE
我们还介绍了一个log42/log4j(之前是带logback的slf4j)
相关库下方:
- org.springframework.boot:spring-boot-starter-log4j2:jar:2.3.7.RELEASE:compile
- org.slf4j:jul-to-slf4j:jar:1.7.30:compile
- io.projectreactor:reactor-test:jar:3.3.12.RELEASE:test
- io.projectreactor:reactor-core:jar:3.3.12.RELEASE:test
- org.reactivestreams:reactive-streams:jar:1.0.3:test
配置的属性如下:
spring.sleuth.messaging.enabled=true
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.client-id=myClientIdentifier
spring.kafka.consumer.group-id=MyConsumerGroup
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
创建 ProducerFactory 的配置 class 如下:
@Configuration
@EnableTransactionManagement
public class KafkaProducerConfig {
KafkaProperties kafkaProperties;
@Autowired
public KafkaProducerConfig(
KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
private ProducerFactory<String, Object> producerFactory() {
DefaultKafkaProducerFactory<String, Object> defaultKafkaProducerFactory =
new DefaultKafkaProducerFactory<>(producerConfigs());
//defaultKafkaProducerFactory.transactionCapable();
//defaultKafkaProducerFactory.setTransactionIdPrefix("tx-");
return defaultKafkaProducerFactory;
}
private Map<String, Object> producerConfigs() {
Map<String, Object> configs = kafkaProperties.buildProducerProperties();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return configs;
}
}
我的 spring 启动应用程序 class:
@Profile("DEV")
@SpringBootApplication(
scanBasePackages = {"my.company"},
exclude = {
DataSourceAutoConfiguration.class,
DataSourceTransactionManagerAutoConfiguration.class,
HibernateJpaAutoConfiguration.class
}
)
@EnableSwagger2
@EnableFeignClients(basePackages = {"my.company.common", "my.company.integration"})
@EnableTransactionManagement
@EnableMongoRepositories(basePackages = {
"my.company.repository"})
@EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING)
@ServletComponentScan
public class DevAppStartup extends SpringBootServletInitializer {
public static void main(String[] args) {
SpringApplication.run(DevAppStartup.class, args);
}
}
在这里您可以找到命令“mvn dependency:tree”的输出 mvn_dependency_tree.txt
As the documentation suggests,如果要使用自己的KafkaTemplate
:
ProducerFactory
bean
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object>producerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object>producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
根据 Spring Sleuth 的文档:https://docs.spring.io/spring-cloud-sleuth/docs/current-SNAPSHOT/reference/html/integrations.html#sleuth-kafka-integration
We decorate the Kafka clients (KafkaProducer and KafkaConsumer) to create a span for each event that is produced or consumed. You can disable this feature by setting the value of spring.sleuth.kafka.enabled to false.
You have to register the Producer or Consumer as beans in order for Sleuth’s auto-configuration to decorate them. When you then inject the beans, the expected type must be Producer or Consumer (and NOT e.g. KafkaProducer).