在kafka生产者回调中添加trace_id

Add trace_id in kafka producer callback

需要在 kafka 生产者回调中添加来自 spring-sleuth 的跟踪 ID

我在我的 POM 中添加了一个 sleuth starter 并创建了一个 kafka-producer。我正在寻找一种方法将当前 trace_id 添加到回调中的日志。

ListenableFuture<SendResult<K, V>> send = kafkaTemplate.send(topic, data);
    send.addCallback(
        new ListenableFutureCallback<>() {
          @Override
          public void onFailure(Throwable throwable) {
            log.error(
                MessageFormat.format(
                    "Error when sending message {0} to Kafka", data.getGlobalUUID()),
                throwable);
            deferredResult.setErrorResult(
                new MarkusKafkaInputException(
                    MessageFormat.format(
                        "Error proceed {0} with message: {1}",
                        data.getGlobalUUID(), throwable.getMessage())));
          }

          @Override
          public void onSuccess(SendResult<K, V> result) {
            log.trace(result.toString());
          }
        });

POM 中的 Sleuth 依赖项 -

       <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
            <version>2.1.2.RELEASE</version>
        </dependency>

我希望日志中有 trace_id,用于回调。或者没有办法异步获取trace_id?

找到了方法,但不确定是否正确

private static class TracingCallback<K, V> implements ListenableFutureCallback<SendResult<K, V>> {

    private final Span span;
    private final Tracer tracer;

    private TracingCallback(Span span, Tracer tracer) {
      this.span = span;
      this.tracer = tracer;
    }

    @Override
    public void onFailure(Throwable throwable) {
      try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
        log.error("test111");
      } finally {
        span.finish();
      }
    }

    @Override
    public void onSuccess(SendResult<K, V> kvSendResult) {
      try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
        log.error("test111");
      } finally {
        span.finish();
      }
    }
  }

并在代码中使用 -

@Autowired private Tracer tracer;

...

ListenableFuture<SendResult<K, V>> send = kafkaTemplate.send(topic, data);
    send.addCallback(new TracingCallback<>(tracer.currentSpan(), tracer));
...