如何使用 Micrometer Timer 记录异步方法的持续时间(returns Mono 或 Flux)
How to use Micrometer Timer to record duration of async method (returns Mono or Flux)
我想使用 Micrometer 来记录异步方法最终发生时的执行时间。有推荐的方法吗?
示例:Kafka 回复模板。我想记录实际执行 sendAndReceive 调用(在请求主题上发送消息并在回复主题上接收响应)所花费的时间。
public Mono<String> sendRequest(Mono<String> request) {
return request
.map(r -> new ProducerRecord<String, String>(requestsTopic, r))
.map(pr -> {
pr.headers()
.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
"reply-topic".getBytes()));
return pr;
})
.map(pr -> replyingKafkaTemplate.sendAndReceive(pr))
... // further maps, filters, etc.
类似于
responseGenerationTimer.record(() -> replyingKafkaTemplate.sendAndReceive(pr)))
不会在这里工作;它只记录创建 Supplier
所需的时间,而不是实际执行时间。
看起来 recordCallable
正如 Brian Clozel 所建议的那样是答案。我写了一个快速测试来验证这一点:
import io.micrometer.core.instrument.Timer;
import reactor.core.publisher.Mono;
public class Capitalizer {
private final Timer timer;
public Capitalizer(Timer timer) {
this.timer = timer;
}
public Mono<String> capitalize(Mono<String> val) {
return val.flatMap(v -> {
try {
return timer.recordCallable(() -> toUpperCase(v));
} catch (Exception e) {
e.printStackTrace();
return null;
}
}).filter(r -> r != null);
}
private Mono<String> toUpperCase(String val) throws InterruptedException {
Thread.sleep(1000);
return Mono.just(val.toUpperCase());
}
}
并对此进行测试:
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.util.concurrent.TimeUnit;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
public class CapitalizerTest {
private static final Logger logger =
LoggerFactory.getLogger(CapitalizerTest.class);
private Capitalizer capitalizer;
private Timer timer;
@Before
public void setUp() {
timer = new SimpleMeterRegistry().timer("test");
capitalizer = new Capitalizer(timer);
}
@Test
public void testCapitalize() {
String val = "Foo";
Mono<String> inputMono = Mono.just(val);
Mono<String> mono = capitalizer.capitalize(inputMono);
mono.subscribe(v -> logger.info("Capitalized {} to {}", val, v));
assertEquals(1, timer.count());
logger.info("Timer executed in {} ms",
timer.totalTime(TimeUnit.MILLISECONDS));
assertTrue(timer.totalTime(TimeUnit.MILLISECONDS) > 1000);
}
}
计时器报告执行时间大约为 1004 毫秒,延迟为 1000 毫秒,没有延迟为 4 毫秒。
您可以执行以下操作:
// Mono<Something> mono = ...
Timer.Sample sample = Timer.start(Clock.SYSTEM); // or use clock of registry
return mono.doOnNext(x -> sample.stop(timer));
示例文档请参见此处:http://micrometer.io/docs/concepts#_storing_start_state_in_code_timer_sample_code
要获得更好的方法,您还可以查看 resilience4j 他们通过变换装饰单声道:https://github.com/resilience4j/resilience4j/tree/master/resilience4j-reactor
你可以使用 reactor.util.context.Context
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import static org.hamcrest.Matchers.is;
public class TestMonoTimer {
private static final Logger LOG = LoggerFactory.getLogger(TestMonoTimer.class);
private static final String TIMER_SAMPLE = "TIMER_SAMPLE";
private static final Timer TIMER = new SimpleMeterRegistry().timer("test");
private static final AtomicBoolean EXECUTION_FLAG = new AtomicBoolean();
@Test
public void testMonoTimer() {
Mono.fromCallable(() -> {
Thread.sleep(1234);
return true;
}).transform(timerTransformer(TIMER))
.subscribeOn(Schedulers.parallel())
.subscribe(EXECUTION_FLAG::set);
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAtomic(EXECUTION_FLAG, is(true));
Assert.assertTrue(TIMER.totalTime(TimeUnit.SECONDS) > 1);
}
private static <T> Function<Mono<T>, Publisher<T>> timerTransformer(Timer timer) {
return mono -> mono
.flatMap(t -> Mono.subscriberContext()
.flatMap(context -> Mono.just(context.<Timer.Sample>get(TIMER_SAMPLE).stop(timer))
.doOnNext(duration -> LOG.info("Execution time is [{}] seconds",
duration / 1000000000D))
.map(ignored -> t)))
.subscriberContext(context -> context.put(TIMER_SAMPLE, Timer.start(Clock.SYSTEM)));
}
}
我使用了以下内容:
private <T> Publisher<T> time(String metricName, Flux<T> publisher) {
return Flux.defer(() -> {
long before = System.currentTimeMillis();
return publisher.doOnNext(next -> Metrics.timer(metricName)
.record(System.currentTimeMillis() - before, TimeUnit.MILLISECONDS));
});
}
所以要在实践中使用它:
Flux.just(someValue)
.flatMap(val -> time("myMetricName", aTaskThatNeedsTimed(val))
.subscribe(val -> {})
您可以只使用 Mono/Flux() 中的 metrics()(查看此处的 metrics():https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html)
然后你可以做类似
的事情
public Mono<String> sendRequest(Mono<String> request) {
return request
.map(r -> new ProducerRecord<String, String>(requestsTopic, r))
.map(pr -> {
pr.headers()
.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
"reply-topic".getBytes()));
return pr;
})
.map(pr -> replyingKafkaTemplate.sendAndReceive(pr)).name("my-metricsname").metrics()
例如在 Graphite 中,您将看到此调用的延迟测量值(您可以在此处查看更多信息:How to use Micrometer timer together with webflux endpoints)
您可以使用 metrics()
计算时间间隔的方法 b/w subscribe()
和 onComplete()
。你可以这样做,
.metrics().elapsed().doOnNext(tuple -> log.info("get response time: " + tuple.getT1() + "ms")).map(Tuple2::getT2);
如果您考虑使用 metrics()
,请理解即使您调用 Mono.name()
.
也不会创建新的 Meter
根据您的情况,您有三种选择。
- 使用
metrics()
- 好吧,如果您考虑使用
metrics()
,请理解即使您调用 Mono.name()
. 也不会创建新的 Meter
- 在
doOnNext
中记录时间并进行时间计算。
- 使用
规定的 subscriptionContext
就个人而言,我想使用方法 3。
我想使用 Micrometer 来记录异步方法最终发生时的执行时间。有推荐的方法吗?
示例:Kafka 回复模板。我想记录实际执行 sendAndReceive 调用(在请求主题上发送消息并在回复主题上接收响应)所花费的时间。
public Mono<String> sendRequest(Mono<String> request) {
return request
.map(r -> new ProducerRecord<String, String>(requestsTopic, r))
.map(pr -> {
pr.headers()
.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
"reply-topic".getBytes()));
return pr;
})
.map(pr -> replyingKafkaTemplate.sendAndReceive(pr))
... // further maps, filters, etc.
类似于
responseGenerationTimer.record(() -> replyingKafkaTemplate.sendAndReceive(pr)))
不会在这里工作;它只记录创建 Supplier
所需的时间,而不是实际执行时间。
看起来 recordCallable
正如 Brian Clozel 所建议的那样是答案。我写了一个快速测试来验证这一点:
import io.micrometer.core.instrument.Timer;
import reactor.core.publisher.Mono;
public class Capitalizer {
private final Timer timer;
public Capitalizer(Timer timer) {
this.timer = timer;
}
public Mono<String> capitalize(Mono<String> val) {
return val.flatMap(v -> {
try {
return timer.recordCallable(() -> toUpperCase(v));
} catch (Exception e) {
e.printStackTrace();
return null;
}
}).filter(r -> r != null);
}
private Mono<String> toUpperCase(String val) throws InterruptedException {
Thread.sleep(1000);
return Mono.just(val.toUpperCase());
}
}
并对此进行测试:
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.util.concurrent.TimeUnit;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
public class CapitalizerTest {
private static final Logger logger =
LoggerFactory.getLogger(CapitalizerTest.class);
private Capitalizer capitalizer;
private Timer timer;
@Before
public void setUp() {
timer = new SimpleMeterRegistry().timer("test");
capitalizer = new Capitalizer(timer);
}
@Test
public void testCapitalize() {
String val = "Foo";
Mono<String> inputMono = Mono.just(val);
Mono<String> mono = capitalizer.capitalize(inputMono);
mono.subscribe(v -> logger.info("Capitalized {} to {}", val, v));
assertEquals(1, timer.count());
logger.info("Timer executed in {} ms",
timer.totalTime(TimeUnit.MILLISECONDS));
assertTrue(timer.totalTime(TimeUnit.MILLISECONDS) > 1000);
}
}
计时器报告执行时间大约为 1004 毫秒,延迟为 1000 毫秒,没有延迟为 4 毫秒。
您可以执行以下操作:
// Mono<Something> mono = ...
Timer.Sample sample = Timer.start(Clock.SYSTEM); // or use clock of registry
return mono.doOnNext(x -> sample.stop(timer));
示例文档请参见此处:http://micrometer.io/docs/concepts#_storing_start_state_in_code_timer_sample_code
要获得更好的方法,您还可以查看 resilience4j 他们通过变换装饰单声道:https://github.com/resilience4j/resilience4j/tree/master/resilience4j-reactor
你可以使用 reactor.util.context.Context
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import static org.hamcrest.Matchers.is;
public class TestMonoTimer {
private static final Logger LOG = LoggerFactory.getLogger(TestMonoTimer.class);
private static final String TIMER_SAMPLE = "TIMER_SAMPLE";
private static final Timer TIMER = new SimpleMeterRegistry().timer("test");
private static final AtomicBoolean EXECUTION_FLAG = new AtomicBoolean();
@Test
public void testMonoTimer() {
Mono.fromCallable(() -> {
Thread.sleep(1234);
return true;
}).transform(timerTransformer(TIMER))
.subscribeOn(Schedulers.parallel())
.subscribe(EXECUTION_FLAG::set);
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAtomic(EXECUTION_FLAG, is(true));
Assert.assertTrue(TIMER.totalTime(TimeUnit.SECONDS) > 1);
}
private static <T> Function<Mono<T>, Publisher<T>> timerTransformer(Timer timer) {
return mono -> mono
.flatMap(t -> Mono.subscriberContext()
.flatMap(context -> Mono.just(context.<Timer.Sample>get(TIMER_SAMPLE).stop(timer))
.doOnNext(duration -> LOG.info("Execution time is [{}] seconds",
duration / 1000000000D))
.map(ignored -> t)))
.subscriberContext(context -> context.put(TIMER_SAMPLE, Timer.start(Clock.SYSTEM)));
}
}
我使用了以下内容:
private <T> Publisher<T> time(String metricName, Flux<T> publisher) {
return Flux.defer(() -> {
long before = System.currentTimeMillis();
return publisher.doOnNext(next -> Metrics.timer(metricName)
.record(System.currentTimeMillis() - before, TimeUnit.MILLISECONDS));
});
}
所以要在实践中使用它:
Flux.just(someValue)
.flatMap(val -> time("myMetricName", aTaskThatNeedsTimed(val))
.subscribe(val -> {})
您可以只使用 Mono/Flux() 中的 metrics()(查看此处的 metrics():https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html) 然后你可以做类似
的事情public Mono<String> sendRequest(Mono<String> request) {
return request
.map(r -> new ProducerRecord<String, String>(requestsTopic, r))
.map(pr -> {
pr.headers()
.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
"reply-topic".getBytes()));
return pr;
})
.map(pr -> replyingKafkaTemplate.sendAndReceive(pr)).name("my-metricsname").metrics()
例如在 Graphite 中,您将看到此调用的延迟测量值(您可以在此处查看更多信息:How to use Micrometer timer together with webflux endpoints)
您可以使用 metrics()
计算时间间隔的方法 b/w subscribe()
和 onComplete()
。你可以这样做,
.metrics().elapsed().doOnNext(tuple -> log.info("get response time: " + tuple.getT1() + "ms")).map(Tuple2::getT2);
如果您考虑使用 metrics()
,请理解即使您调用 Mono.name()
.
根据您的情况,您有三种选择。
- 使用
metrics()
- 好吧,如果您考虑使用
metrics()
,请理解即使您调用Mono.name()
. 也不会创建新的 Meter
- 好吧,如果您考虑使用
- 在
doOnNext
中记录时间并进行时间计算。 - 使用
就个人而言,我想使用方法 3。