测量 Project Reactor 中的执行时间

Measure execution time in Project Reactor

我正在尝试对我的一个项目进行负载测试,我认为在我的更改中的某个地方我可能引入了一些性能受到影响的点。

有没有办法测量执行时间log以防超过阈值?

flux
  .<doOnSomethingIsTakingTooLong>(signal -> 
       log.error("SLOW PROCESSING {}", signal), Duration.ofSeconds(2)

有几种实现方式:

  1. 公开指标

这与您的要求不完全相同,但指标是收集此类信息的首选方式。 Reactor 公开了多个有助于调查性能问题的指标。您可以将 metrics() 运算符与 name() & tag() 结合使用来发出指标。有关详细信息,请参阅 Exposing Reactor metrics

events()
    .flatMap(rec ->
            processEvent(rec)
              .name("process") 
              .metrics() 
    )

您还可以启用调度程序指标 Schedulers.enableMetrics();

这个项目有很好的基于Prometheus和Grafana监控的例子Reactor monitoring demo

  1. 使用上下文来衡量执行时间

想法是在上下文中保存开始时间,然后在发出完成事件时计算执行时间。

events()
    .flatMap(rec ->
            processEvent(rec)
                 .transform(this::reportExecutionTime)
    );

其中 reportExecutionTime 的实现方式如下

private <T> Mono<T> reportExecutionTime(Mono<T> mono) {
    String taskStartMsKey = "task.start";

    return Mono.deferContextual(ctx -> mono
                    .doOnSuccess(__ -> {
                        var executionTime = currentTimeMillis() - ctx.<Long>get(taskStartMsKey);
                        log.info("execution time: {}", executionTime);
                    })
            )
            .contextWrite(ctx -> ctx.put(taskStartMsKey, currentTimeMillis()));
}

这是一个完整的测试示例

@Test
void reportExecutionTimeTest() {
    var stream = events()
            .flatMap(rec ->
                    processEvent(rec)
                            .transform(this::reportExecutionTime)
            );

    StepVerifier.create(stream)
            .expectNextCount(100)
            .verifyComplete();
}

private <T> Mono<T> reportExecutionTime(Mono<T> mono) {
    String taskStartMsKey = "task.start";

    return Mono.deferContextual(ctx -> mono
                    .doOnSuccess(__ -> {
                        var executionTime = currentTimeMillis() - ctx.<Long>get(taskStartMsKey);
                        log.info("execution time: {}", executionTime);
                    })
            )
            .contextWrite(ctx -> ctx.put(taskStartMsKey, currentTimeMillis()));
}

private Flux<Integer> events() {
    return Flux.range(1, 100);
}

private Mono<Integer> processEvent(int i) {
    return Mono.delay(Duration.ofMillis(i + 100))
            .thenReturn(i);
}