配置以在 KafkaMessageListenerContainer 中启动新跨度而不是在侦听器上(Kafka + Sleuth)

Configure to start new span in KafkaMessageListenerContainer not on listener (Kafka + Sleuth)

我是 kafka + sleuth 的新手。目前我遇到了一个问题,跨度在 MessageListenerMethodInterceptor 中创建,它在 KafkaListener 上触发,但如果出现异常,我们将在 ErrorHandler 中丢失 spanId 和 traceId。是否可以配置侦探以在 KafkaMessageListenerContainer 中启动跨度?

记录拦截器是检测的最佳位置;最近的改进扩大了它的范围以包括错误处理程序。

https://github.com/spring-projects/spring-kafka/pull/1946

我处理过了。在我的本地电脑上它工作正常,但我有一些疑问...

  1. 我通过在 application.properties

    中设置 spring.sleuth.messaging.kafka.enabled=false 禁用了 MessageListenerMethodInterceptor
  2. 我创建了一个记录拦截器,org.springframework.kafka.listener.RecordInterceptor的实现如下:

     private final Tracer tracer;
     private final KafkaTracing kafkaTracing;
    
     private static final ThreadLocal<Span> CURRENT_SPAN = new ThreadLocal<>();
     private static final ThreadLocal<Tracer.SpanInScope> CURRENT_SPAN_IN_SCOPE = new ThreadLocal<>();
    
     @Override
     public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
         openSpan(record);
         return RecordInterceptor.super.intercept(record, consumer);
     }
    
     @Override
     public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K, V> consumer) {
         Span span = CURRENT_SPAN.get();
         if (span != null) {
             String message = Optional.ofNullable(exception.getMessage()).orElseGet(() -> exception.getClass().getSimpleName());
             span.tag("error", message);
         }
     }
    
     @Override
     public void clearThreadState(Consumer<?, ?> consumer) {
         closeSpan();
     }
    
     private void openSpan(ConsumerRecord<K, V> record) {
         Span span = kafkaTracing.nextSpan(record).name("kafka-tracing").start();
         Tracer.SpanInScope spanInScope = tracer.withSpanInScope(span);
         CURRENT_SPAN.set(span);
         CURRENT_SPAN_IN_SCOPE.set(spanInScope);
         if (LOG.isDebugEnabled()) {
             LOG.debug("created span {}", span);
         }
     }
    
     private void closeSpan() {
         Span span = CURRENT_SPAN.get();
         if (span != null) {
             try {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("finishing span {}", span);
                 }
                 span.finish();
             } catch (Exception ex) {
                 LOG.warn("on finishing Span {} got error {}", span, ex.getMessage());
             } finally {
                 CURRENT_SPAN.set(null);
             }
         }
    
         Tracer.SpanInScope spanInScope = CURRENT_SPAN_IN_SCOPE.get();
         if (spanInScope != null) {
             try {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("closing SpanInScope {}", spanInScope);
                 }
                 spanInScope.close();
             } catch (Exception ex) {
                 LOG.warn("on closing SpanInScope {} got error {}", spanInScope, ex.getMessage());
             } finally {
                 CURRENT_SPAN_IN_SCOPE.set(null);
             }
         }
     }
    

其他方法为空

这个拦截器的 bean 创建如下:

    @Bean
    public KafkaTracingRecordInterceptor<Object, Object> kafkaRecordInterceptor(Tracer tracer, Tracing tracing) {
        return new KafkaTracingRecordInterceptor<>(tracer, KafkaTracing.create(tracing));
    }

PS。如果我遗漏了什么或以错误的方式实施,请添加评论。