配置以在 KafkaMessageListenerContainer 中启动新跨度而不是在侦听器上(Kafka + Sleuth)
Configure to start new span in KafkaMessageListenerContainer not on listener (Kafka + Sleuth)
我是 kafka + sleuth 的新手。目前我遇到了一个问题,跨度在 MessageListenerMethodInterceptor 中创建,它在 KafkaListener 上触发,但如果出现异常,我们将在 ErrorHandler 中丢失 spanId 和 traceId。是否可以配置侦探以在 KafkaMessageListenerContainer 中启动跨度?
记录拦截器是检测的最佳位置;最近的改进扩大了它的范围以包括错误处理程序。
我处理过了。在我的本地电脑上它工作正常,但我有一些疑问...
我通过在 application.properties
中设置 spring.sleuth.messaging.kafka.enabled=false
禁用了 MessageListenerMethodInterceptor
我创建了一个记录拦截器,org.springframework.kafka.listener.RecordInterceptor
的实现如下:
private final Tracer tracer;
private final KafkaTracing kafkaTracing;
private static final ThreadLocal<Span> CURRENT_SPAN = new ThreadLocal<>();
private static final ThreadLocal<Tracer.SpanInScope> CURRENT_SPAN_IN_SCOPE = new ThreadLocal<>();
@Override
public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
openSpan(record);
return RecordInterceptor.super.intercept(record, consumer);
}
@Override
public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K, V> consumer) {
Span span = CURRENT_SPAN.get();
if (span != null) {
String message = Optional.ofNullable(exception.getMessage()).orElseGet(() -> exception.getClass().getSimpleName());
span.tag("error", message);
}
}
@Override
public void clearThreadState(Consumer<?, ?> consumer) {
closeSpan();
}
private void openSpan(ConsumerRecord<K, V> record) {
Span span = kafkaTracing.nextSpan(record).name("kafka-tracing").start();
Tracer.SpanInScope spanInScope = tracer.withSpanInScope(span);
CURRENT_SPAN.set(span);
CURRENT_SPAN_IN_SCOPE.set(spanInScope);
if (LOG.isDebugEnabled()) {
LOG.debug("created span {}", span);
}
}
private void closeSpan() {
Span span = CURRENT_SPAN.get();
if (span != null) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("finishing span {}", span);
}
span.finish();
} catch (Exception ex) {
LOG.warn("on finishing Span {} got error {}", span, ex.getMessage());
} finally {
CURRENT_SPAN.set(null);
}
}
Tracer.SpanInScope spanInScope = CURRENT_SPAN_IN_SCOPE.get();
if (spanInScope != null) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("closing SpanInScope {}", spanInScope);
}
spanInScope.close();
} catch (Exception ex) {
LOG.warn("on closing SpanInScope {} got error {}", spanInScope, ex.getMessage());
} finally {
CURRENT_SPAN_IN_SCOPE.set(null);
}
}
}
其他方法为空
这个拦截器的 bean 创建如下:
@Bean
public KafkaTracingRecordInterceptor<Object, Object> kafkaRecordInterceptor(Tracer tracer, Tracing tracing) {
return new KafkaTracingRecordInterceptor<>(tracer, KafkaTracing.create(tracing));
}
PS。如果我遗漏了什么或以错误的方式实施,请添加评论。
我是 kafka + sleuth 的新手。目前我遇到了一个问题,跨度在 MessageListenerMethodInterceptor 中创建,它在 KafkaListener 上触发,但如果出现异常,我们将在 ErrorHandler 中丢失 spanId 和 traceId。是否可以配置侦探以在 KafkaMessageListenerContainer 中启动跨度?
记录拦截器是检测的最佳位置;最近的改进扩大了它的范围以包括错误处理程序。
我处理过了。在我的本地电脑上它工作正常,但我有一些疑问...
我通过在 application.properties
中设置spring.sleuth.messaging.kafka.enabled=false
禁用了MessageListenerMethodInterceptor
我创建了一个记录拦截器,
org.springframework.kafka.listener.RecordInterceptor
的实现如下:private final Tracer tracer; private final KafkaTracing kafkaTracing; private static final ThreadLocal<Span> CURRENT_SPAN = new ThreadLocal<>(); private static final ThreadLocal<Tracer.SpanInScope> CURRENT_SPAN_IN_SCOPE = new ThreadLocal<>(); @Override public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V> consumer) { openSpan(record); return RecordInterceptor.super.intercept(record, consumer); } @Override public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K, V> consumer) { Span span = CURRENT_SPAN.get(); if (span != null) { String message = Optional.ofNullable(exception.getMessage()).orElseGet(() -> exception.getClass().getSimpleName()); span.tag("error", message); } } @Override public void clearThreadState(Consumer<?, ?> consumer) { closeSpan(); } private void openSpan(ConsumerRecord<K, V> record) { Span span = kafkaTracing.nextSpan(record).name("kafka-tracing").start(); Tracer.SpanInScope spanInScope = tracer.withSpanInScope(span); CURRENT_SPAN.set(span); CURRENT_SPAN_IN_SCOPE.set(spanInScope); if (LOG.isDebugEnabled()) { LOG.debug("created span {}", span); } } private void closeSpan() { Span span = CURRENT_SPAN.get(); if (span != null) { try { if (LOG.isDebugEnabled()) { LOG.debug("finishing span {}", span); } span.finish(); } catch (Exception ex) { LOG.warn("on finishing Span {} got error {}", span, ex.getMessage()); } finally { CURRENT_SPAN.set(null); } } Tracer.SpanInScope spanInScope = CURRENT_SPAN_IN_SCOPE.get(); if (spanInScope != null) { try { if (LOG.isDebugEnabled()) { LOG.debug("closing SpanInScope {}", spanInScope); } spanInScope.close(); } catch (Exception ex) { LOG.warn("on closing SpanInScope {} got error {}", spanInScope, ex.getMessage()); } finally { CURRENT_SPAN_IN_SCOPE.set(null); } } }
其他方法为空
这个拦截器的 bean 创建如下:
@Bean
public KafkaTracingRecordInterceptor<Object, Object> kafkaRecordInterceptor(Tracer tracer, Tracing tracing) {
return new KafkaTracingRecordInterceptor<>(tracer, KafkaTracing.create(tracing));
}
PS。如果我遗漏了什么或以错误的方式实施,请添加评论。