如何计算rx中的处理时间

How to calculate the processing time in rx

对于以下流程,我想知道如何计算处理 forEach(...) 中所有数据所需的时间。

Observable
  .from(1,2,3)
  .flatMap(it - {})
  .toBlocking()
  .forEarch(it -> {//some paring logic here})

编辑

阅读本教程后:Leaving the Monad,我觉得简单的解决方案是执行以下操作。如果我遗漏了什么请告诉我

List items = Observable
      .from(1,2,3)
      .flatMap(it - {})
      .toList();

long startTime = System.currentTimeMillis();

for(Object it : items)
{
  //some parsing here
}

long processingTime = System.currentTimeMillis() - startTime 

我认为这是你想要的,我从你的代码中拆分了值 Observable.range 的产生(应该与你的示例中的 Observable.just 匹配)和要测量的管道,在这种情况下我添加了一些假计算。

想法是将要测量的管道包裹在平面图中,并在单个平面图中添加秒表。

Observable.range(1, 10_000)
        .nest()
        .flatMap(
                o -> {
                    Observable<Integer> pipelineToMeasure = o.flatMap(i -> {
                        Random random = new Random(73);
                        try {
                            TimeUnit.MILLISECONDS.sleep(random.nextInt(5));
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                        return Observable.just(i);
                    });

                    Stopwatch measure = Stopwatch.createUnstarted();
                    return pipelineToMeasure
                            .doOnSubscribe(measure::start)
                            .doOnTerminate(() -> {
                                measure.stop();
                                System.out.println(measure);
                            });
                }
        )
        .toBlocking()
        .forEach(System.out::println);

为了避免混淆,我使用 nest 来避免在外部平面图中重新创建自己的 Observable。 我也在使用 Guava 库的 Stopwatch


为了提供更多信息,这里有一段代码可以在阻塞时在 forEach 语句中进行测量。

MeasurableAction1<Integer> measuring = MeasurableAction1.measure(System.out::println);
Observable
        .just(1, 2, 3)
        .flatMap(Observable::just)
        .toBlocking()
        .forEach(measuring.start());

measuring.stop().elapsed(TimeUnit.SECONDS);

和测量 class :

private static class MeasurableAction1<T> implements Action1<T> {
    private Stopwatch measure = Stopwatch.createUnstarted();
    private Action1<? super T> action;

    public MeasurableAction1(Action1<? super T> action) {
        this.action = action;
    }

    @Override
    public void call(T t) {
        action.call(t);
    }

    public MeasurableAction1<T> start() {
        measure.start();
        return this;
    }

    public MeasurableAction1<T> stop() {
        measure.stop();
        return this;
    }

    public long elapsed(TimeUnit desiredUnit) {
        return measure.elapsed(desiredUnit);
    }

    public static <T> MeasurableAction1<T> measure(Action1<? super T> action) {
        return new MeasurableAction1<>(action);
    }
}

最好不要与订阅者阻塞,请注意 .subscribe 提供更多 选项 .forEach 别名(无论是否阻塞):

    Observable
            .just(1, 2, 3)
            .flatMap(Observable::just)
            .subscribe(MeasuringSubscriber.measuringSubscriber(
                    System.out::println,
                    System.out::println,
                    System.out::println
            ));

订阅者:

private static class MeasuringSubscriber<T> extends Subscriber<T> {
    private Stopwatch measure = Stopwatch.createUnstarted();
    private Action1<? super T> onNext;
    private final Action1<Throwable> onError;
    private final Action0 onComplete;

    public MeasuringSubscriber(Action1<? super T> onNext, Action1<Throwable> onError, Action0 onComplete) {
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
    }

    @Override
    public void onCompleted() {
        try {
            onComplete.call();
        } finally {
            stopAndPrintMeasure();
        }
    }

    @Override
    public void onError(Throwable e) {
        try {
            onError.call(e);
        } finally {
            stopAndPrintMeasure();
        }
    }

    @Override
    public void onNext(T item) {
        onNext.call(item);
    }

    @Override
    public void onStart() {
        measure.start();
        super.onStart();
    }

    private void stopAndPrintMeasure() {
        measure.stop();
        System.out.println("took " + measure);
    }

    private static <T> MeasuringSubscriber<T> measuringSubscriber(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
        return new MeasuringSubscriber<>(onNext, onError, onComplete);
    }
}

一个选项是创建一个 Observable 来输出时间。您可以通过使用 Observable#using:

包装您的计算来做到这一点
public class TimerExample {
    public static void main(String[] args) {
        final PublishSubject<Long> timings = PublishSubject.create();

        final Observable<List<Integer>> list = Observable
                .just(1, 2, 3)
                .flatMap(TimerExample::longRunningComputation)
                .toList();

        final Observable<List<Integer>> timed
                = Observable.using(() -> new Timer(timings), (t) -> list, Timer::time);

        timings.subscribe(time -> System.out.println("Time: " + time + "ms"));

        List<Integer> ints = timed.toBlocking().last();
        System.out.println("ints: " + Joiner.on(", ").join(ints));

        ints = timed.toBlocking().last();
        System.out.println("ints: " + Joiner.on(", ").join(ints));
    }

    private static Observable<Integer> longRunningComputation(Integer i) {
        return Observable.timer(1, TimeUnit.SECONDS).map(ignored -> i);
    }

    public static class Timer {
        private final long startTime;
        private final Observer<Long> timings;

        public Timer(Observer<Long> timings) {
            this.startTime = System.currentTimeMillis();
            this.timings = timings;
        }

        public void time() {
            timings.onNext(System.currentTimeMillis() - startTime);
        }
    }
}

在这种情况下,时间会打印到控制台,但您可以随意使用它们:

Time: 1089ms
ints: 2, 1, 3
Time: 1003ms
ints: 1, 3, 2