如何使用 Micrometer Timer 记录异步方法的持续时间(returns Mono 或 Flux)

How to use Micrometer Timer to record duration of async method (returns Mono or Flux)

我想使用 Micrometer 来记录异步方法最终发生时的执行时间。有推荐的方法吗?

示例:Kafka 回复模板。我想记录实际执行 sendAndReceive 调用(在请求主题上发送消息并在回复主题上接收响应)所花费的时间。

    public Mono<String> sendRequest(Mono<String> request) {
        return request
            .map(r -> new ProducerRecord<String, String>(requestsTopic, r))
            .map(pr -> {
                pr.headers()
                        .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
                                "reply-topic".getBytes()));
                return pr;
            })
            .map(pr -> replyingKafkaTemplate.sendAndReceive(pr))
            ... // further maps, filters, etc.

类似于

responseGenerationTimer.record(() -> replyingKafkaTemplate.sendAndReceive(pr)))

不会在这里工作;它只记录创建 Supplier 所需的时间,而不是实际执行时间。

看起来 recordCallable 正如 Brian Clozel 所建议的那样是答案。我写了一个快速测试来验证这一点:

import io.micrometer.core.instrument.Timer;
import reactor.core.publisher.Mono;

public class Capitalizer {

    private final Timer timer;

    public Capitalizer(Timer timer) {
        this.timer = timer;
    }

    public Mono<String> capitalize(Mono<String> val) {
        return val.flatMap(v -> {
            try {
                return timer.recordCallable(() -> toUpperCase(v));
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }).filter(r -> r != null);
    }

    private Mono<String> toUpperCase(String val) throws InterruptedException {
        Thread.sleep(1000);
        return Mono.just(val.toUpperCase());
    }
}

并对此进行测试:

import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.util.concurrent.TimeUnit;

import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;

public class CapitalizerTest {

    private static final Logger logger =
        LoggerFactory.getLogger(CapitalizerTest.class);

    private Capitalizer capitalizer;
    private Timer timer;

    @Before
    public void setUp() {
        timer = new SimpleMeterRegistry().timer("test");
        capitalizer = new Capitalizer(timer);
    }

    @Test
    public void testCapitalize() {
        String val = "Foo";
        Mono<String> inputMono = Mono.just(val);
        Mono<String> mono = capitalizer.capitalize(inputMono);
        mono.subscribe(v -> logger.info("Capitalized {} to {}", val, v));
        assertEquals(1, timer.count());
        logger.info("Timer executed in {} ms",
            timer.totalTime(TimeUnit.MILLISECONDS));
        assertTrue(timer.totalTime(TimeUnit.MILLISECONDS) > 1000);
    }
}

计时器报告执行时间大约为 1004 毫秒,延迟为 1000 毫秒,没有延迟为 4 毫秒。

您可以执行以下操作:

// Mono<Something> mono = ...
Timer.Sample sample = Timer.start(Clock.SYSTEM); // or use clock of registry
return mono.doOnNext(x -> sample.stop(timer));

示例文档请参见此处:http://micrometer.io/docs/concepts#_storing_start_state_in_code_timer_sample_code

要获得更好的方法,您还可以查看 resilience4j 他们通过变换装饰单声道:https://github.com/resilience4j/resilience4j/tree/master/resilience4j-reactor

你可以使用 reactor.util.context.Context

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static org.hamcrest.Matchers.is;

public class TestMonoTimer {
    private static final Logger LOG = LoggerFactory.getLogger(TestMonoTimer.class);

    private static final String TIMER_SAMPLE = "TIMER_SAMPLE";
    private static final Timer TIMER = new SimpleMeterRegistry().timer("test");
    private static final AtomicBoolean EXECUTION_FLAG = new AtomicBoolean();

    @Test
    public void testMonoTimer() {
        Mono.fromCallable(() -> {
            Thread.sleep(1234);
            return true;
        }).transform(timerTransformer(TIMER))
                .subscribeOn(Schedulers.parallel())
                .subscribe(EXECUTION_FLAG::set);

        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAtomic(EXECUTION_FLAG, is(true));
        Assert.assertTrue(TIMER.totalTime(TimeUnit.SECONDS) > 1);
    }

    private static <T> Function<Mono<T>, Publisher<T>> timerTransformer(Timer timer) {
        return mono -> mono
                .flatMap(t -> Mono.subscriberContext()
                        .flatMap(context -> Mono.just(context.<Timer.Sample>get(TIMER_SAMPLE).stop(timer))
                                .doOnNext(duration -> LOG.info("Execution time is [{}] seconds",
                                        duration / 1000000000D))
                                .map(ignored -> t)))
                .subscriberContext(context -> context.put(TIMER_SAMPLE, Timer.start(Clock.SYSTEM)));
    }
}

我使用了以下内容:

private <T> Publisher<T> time(String metricName, Flux<T> publisher) {
  return Flux.defer(() -> {

  long before = System.currentTimeMillis();
  return publisher.doOnNext(next -> Metrics.timer(metricName)
        .record(System.currentTimeMillis() - before, TimeUnit.MILLISECONDS));
  });
}

所以要在实践中使用它:

Flux.just(someValue)
  .flatMap(val -> time("myMetricName", aTaskThatNeedsTimed(val))
  .subscribe(val -> {})

您可以只使用 Mono/Flux() 中的 metrics()(查看此处的 metrics():https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html) 然后你可以做类似

的事情
public Mono<String> sendRequest(Mono<String> request) {
    return request
        .map(r -> new ProducerRecord<String, String>(requestsTopic, r))
        .map(pr -> {
            pr.headers()
                    .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
                            "reply-topic".getBytes()));
            return pr;
        })
        .map(pr -> replyingKafkaTemplate.sendAndReceive(pr)).name("my-metricsname").metrics()

例如在 Graphite 中,您将看到此调用的延迟测量值(您可以在此处查看更多信息:How to use Micrometer timer together with webflux endpoints

您可以使用 metrics() 计算时间间隔的方法 b/w subscribe()onComplete()。你可以这样做,

 .metrics().elapsed().doOnNext(tuple -> log.info("get response time: " + tuple.getT1() + "ms")).map(Tuple2::getT2);

如果您考虑使用 metrics(),请理解即使您调用 Mono.name().

也不会创建新的 Meter

根据您的情况,您有三种选择。

  1. 使用metrics()
    • 好吧,如果您考虑使用 metrics(),请理解即使您调用 Mono.name().
    • 也不会创建新的 Meter
  2. doOnNext中记录时间并进行时间计算。
  3. 使用
  4. 规定的 subscriptionContext

就个人而言,我想使用方法 3