在kafka生产者回调中添加trace_id
Add trace_id in kafka producer callback
需要在 kafka 生产者回调中添加来自 spring-sleuth 的跟踪 ID
我在我的 POM 中添加了一个 sleuth starter 并创建了一个 kafka-producer。我正在寻找一种方法将当前 trace_id 添加到回调中的日志。
ListenableFuture<SendResult<K, V>> send = kafkaTemplate.send(topic, data);
send.addCallback(
new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable throwable) {
log.error(
MessageFormat.format(
"Error when sending message {0} to Kafka", data.getGlobalUUID()),
throwable);
deferredResult.setErrorResult(
new MarkusKafkaInputException(
MessageFormat.format(
"Error proceed {0} with message: {1}",
data.getGlobalUUID(), throwable.getMessage())));
}
@Override
public void onSuccess(SendResult<K, V> result) {
log.trace(result.toString());
}
});
POM 中的 Sleuth 依赖项 -
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
我希望日志中有 trace_id,用于回调。或者没有办法异步获取trace_id?
找到了方法,但不确定是否正确
private static class TracingCallback<K, V> implements ListenableFutureCallback<SendResult<K, V>> {
private final Span span;
private final Tracer tracer;
private TracingCallback(Span span, Tracer tracer) {
this.span = span;
this.tracer = tracer;
}
@Override
public void onFailure(Throwable throwable) {
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
log.error("test111");
} finally {
span.finish();
}
}
@Override
public void onSuccess(SendResult<K, V> kvSendResult) {
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
log.error("test111");
} finally {
span.finish();
}
}
}
并在代码中使用 -
@Autowired private Tracer tracer;
...
ListenableFuture<SendResult<K, V>> send = kafkaTemplate.send(topic, data);
send.addCallback(new TracingCallback<>(tracer.currentSpan(), tracer));
...
需要在 kafka 生产者回调中添加来自 spring-sleuth 的跟踪 ID
我在我的 POM 中添加了一个 sleuth starter 并创建了一个 kafka-producer。我正在寻找一种方法将当前 trace_id 添加到回调中的日志。
ListenableFuture<SendResult<K, V>> send = kafkaTemplate.send(topic, data);
send.addCallback(
new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable throwable) {
log.error(
MessageFormat.format(
"Error when sending message {0} to Kafka", data.getGlobalUUID()),
throwable);
deferredResult.setErrorResult(
new MarkusKafkaInputException(
MessageFormat.format(
"Error proceed {0} with message: {1}",
data.getGlobalUUID(), throwable.getMessage())));
}
@Override
public void onSuccess(SendResult<K, V> result) {
log.trace(result.toString());
}
});
POM 中的 Sleuth 依赖项 -
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
我希望日志中有 trace_id,用于回调。或者没有办法异步获取trace_id?
找到了方法,但不确定是否正确
private static class TracingCallback<K, V> implements ListenableFutureCallback<SendResult<K, V>> {
private final Span span;
private final Tracer tracer;
private TracingCallback(Span span, Tracer tracer) {
this.span = span;
this.tracer = tracer;
}
@Override
public void onFailure(Throwable throwable) {
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
log.error("test111");
} finally {
span.finish();
}
}
@Override
public void onSuccess(SendResult<K, V> kvSendResult) {
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
log.error("test111");
} finally {
span.finish();
}
}
}
并在代码中使用 -
@Autowired private Tracer tracer;
...
ListenableFuture<SendResult<K, V>> send = kafkaTemplate.send(topic, data);
send.addCallback(new TracingCallback<>(tracer.currentSpan(), tracer));
...