如何计算rx中的处理时间
How to calculate the processing time in rx
对于以下流程,我想知道如何计算处理 forEach(...) 中所有数据所需的时间。
Observable
.from(1,2,3)
.flatMap(it - {})
.toBlocking()
.forEarch(it -> {//some paring logic here})
编辑
阅读本教程后:Leaving the Monad,我觉得简单的解决方案是执行以下操作。如果我遗漏了什么请告诉我
List items = Observable
.from(1,2,3)
.flatMap(it - {})
.toList();
long startTime = System.currentTimeMillis();
for(Object it : items)
{
//some parsing here
}
long processingTime = System.currentTimeMillis() - startTime
我认为这是你想要的,我从你的代码中拆分了值 Observable.range
的产生(应该与你的示例中的 Observable.just
匹配)和要测量的管道,在这种情况下我添加了一些假计算。
想法是将要测量的管道包裹在平面图中,并在单个平面图中添加秒表。
Observable.range(1, 10_000)
.nest()
.flatMap(
o -> {
Observable<Integer> pipelineToMeasure = o.flatMap(i -> {
Random random = new Random(73);
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(5));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return Observable.just(i);
});
Stopwatch measure = Stopwatch.createUnstarted();
return pipelineToMeasure
.doOnSubscribe(measure::start)
.doOnTerminate(() -> {
measure.stop();
System.out.println(measure);
});
}
)
.toBlocking()
.forEach(System.out::println);
为了避免混淆,我使用 nest
来避免在外部平面图中重新创建自己的 Observable。
我也在使用 Guava 库的 Stopwatch
。
为了提供更多信息,这里有一段代码可以在阻塞时在 forEach
语句中进行测量。
MeasurableAction1<Integer> measuring = MeasurableAction1.measure(System.out::println);
Observable
.just(1, 2, 3)
.flatMap(Observable::just)
.toBlocking()
.forEach(measuring.start());
measuring.stop().elapsed(TimeUnit.SECONDS);
和测量 class :
private static class MeasurableAction1<T> implements Action1<T> {
private Stopwatch measure = Stopwatch.createUnstarted();
private Action1<? super T> action;
public MeasurableAction1(Action1<? super T> action) {
this.action = action;
}
@Override
public void call(T t) {
action.call(t);
}
public MeasurableAction1<T> start() {
measure.start();
return this;
}
public MeasurableAction1<T> stop() {
measure.stop();
return this;
}
public long elapsed(TimeUnit desiredUnit) {
return measure.elapsed(desiredUnit);
}
public static <T> MeasurableAction1<T> measure(Action1<? super T> action) {
return new MeasurableAction1<>(action);
}
}
最好不要与订阅者阻塞,请注意 .subscribe
提供更多 选项 .forEach
别名(无论是否阻塞):
Observable
.just(1, 2, 3)
.flatMap(Observable::just)
.subscribe(MeasuringSubscriber.measuringSubscriber(
System.out::println,
System.out::println,
System.out::println
));
订阅者:
private static class MeasuringSubscriber<T> extends Subscriber<T> {
private Stopwatch measure = Stopwatch.createUnstarted();
private Action1<? super T> onNext;
private final Action1<Throwable> onError;
private final Action0 onComplete;
public MeasuringSubscriber(Action1<? super T> onNext, Action1<Throwable> onError, Action0 onComplete) {
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
}
@Override
public void onCompleted() {
try {
onComplete.call();
} finally {
stopAndPrintMeasure();
}
}
@Override
public void onError(Throwable e) {
try {
onError.call(e);
} finally {
stopAndPrintMeasure();
}
}
@Override
public void onNext(T item) {
onNext.call(item);
}
@Override
public void onStart() {
measure.start();
super.onStart();
}
private void stopAndPrintMeasure() {
measure.stop();
System.out.println("took " + measure);
}
private static <T> MeasuringSubscriber<T> measuringSubscriber(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
return new MeasuringSubscriber<>(onNext, onError, onComplete);
}
}
一个选项是创建一个 Observable 来输出时间。您可以通过使用 Observable#using
:
包装您的计算来做到这一点
public class TimerExample {
public static void main(String[] args) {
final PublishSubject<Long> timings = PublishSubject.create();
final Observable<List<Integer>> list = Observable
.just(1, 2, 3)
.flatMap(TimerExample::longRunningComputation)
.toList();
final Observable<List<Integer>> timed
= Observable.using(() -> new Timer(timings), (t) -> list, Timer::time);
timings.subscribe(time -> System.out.println("Time: " + time + "ms"));
List<Integer> ints = timed.toBlocking().last();
System.out.println("ints: " + Joiner.on(", ").join(ints));
ints = timed.toBlocking().last();
System.out.println("ints: " + Joiner.on(", ").join(ints));
}
private static Observable<Integer> longRunningComputation(Integer i) {
return Observable.timer(1, TimeUnit.SECONDS).map(ignored -> i);
}
public static class Timer {
private final long startTime;
private final Observer<Long> timings;
public Timer(Observer<Long> timings) {
this.startTime = System.currentTimeMillis();
this.timings = timings;
}
public void time() {
timings.onNext(System.currentTimeMillis() - startTime);
}
}
}
在这种情况下,时间会打印到控制台,但您可以随意使用它们:
Time: 1089ms
ints: 2, 1, 3
Time: 1003ms
ints: 1, 3, 2
对于以下流程,我想知道如何计算处理 forEach(...) 中所有数据所需的时间。
Observable
.from(1,2,3)
.flatMap(it - {})
.toBlocking()
.forEarch(it -> {//some paring logic here})
编辑
阅读本教程后:Leaving the Monad,我觉得简单的解决方案是执行以下操作。如果我遗漏了什么请告诉我
List items = Observable
.from(1,2,3)
.flatMap(it - {})
.toList();
long startTime = System.currentTimeMillis();
for(Object it : items)
{
//some parsing here
}
long processingTime = System.currentTimeMillis() - startTime
我认为这是你想要的,我从你的代码中拆分了值 Observable.range
的产生(应该与你的示例中的 Observable.just
匹配)和要测量的管道,在这种情况下我添加了一些假计算。
想法是将要测量的管道包裹在平面图中,并在单个平面图中添加秒表。
Observable.range(1, 10_000)
.nest()
.flatMap(
o -> {
Observable<Integer> pipelineToMeasure = o.flatMap(i -> {
Random random = new Random(73);
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(5));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return Observable.just(i);
});
Stopwatch measure = Stopwatch.createUnstarted();
return pipelineToMeasure
.doOnSubscribe(measure::start)
.doOnTerminate(() -> {
measure.stop();
System.out.println(measure);
});
}
)
.toBlocking()
.forEach(System.out::println);
为了避免混淆,我使用 nest
来避免在外部平面图中重新创建自己的 Observable。
我也在使用 Guava 库的 Stopwatch
。
为了提供更多信息,这里有一段代码可以在阻塞时在 forEach
语句中进行测量。
MeasurableAction1<Integer> measuring = MeasurableAction1.measure(System.out::println);
Observable
.just(1, 2, 3)
.flatMap(Observable::just)
.toBlocking()
.forEach(measuring.start());
measuring.stop().elapsed(TimeUnit.SECONDS);
和测量 class :
private static class MeasurableAction1<T> implements Action1<T> {
private Stopwatch measure = Stopwatch.createUnstarted();
private Action1<? super T> action;
public MeasurableAction1(Action1<? super T> action) {
this.action = action;
}
@Override
public void call(T t) {
action.call(t);
}
public MeasurableAction1<T> start() {
measure.start();
return this;
}
public MeasurableAction1<T> stop() {
measure.stop();
return this;
}
public long elapsed(TimeUnit desiredUnit) {
return measure.elapsed(desiredUnit);
}
public static <T> MeasurableAction1<T> measure(Action1<? super T> action) {
return new MeasurableAction1<>(action);
}
}
最好不要与订阅者阻塞,请注意 .subscribe
提供更多 选项 .forEach
别名(无论是否阻塞):
Observable
.just(1, 2, 3)
.flatMap(Observable::just)
.subscribe(MeasuringSubscriber.measuringSubscriber(
System.out::println,
System.out::println,
System.out::println
));
订阅者:
private static class MeasuringSubscriber<T> extends Subscriber<T> {
private Stopwatch measure = Stopwatch.createUnstarted();
private Action1<? super T> onNext;
private final Action1<Throwable> onError;
private final Action0 onComplete;
public MeasuringSubscriber(Action1<? super T> onNext, Action1<Throwable> onError, Action0 onComplete) {
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
}
@Override
public void onCompleted() {
try {
onComplete.call();
} finally {
stopAndPrintMeasure();
}
}
@Override
public void onError(Throwable e) {
try {
onError.call(e);
} finally {
stopAndPrintMeasure();
}
}
@Override
public void onNext(T item) {
onNext.call(item);
}
@Override
public void onStart() {
measure.start();
super.onStart();
}
private void stopAndPrintMeasure() {
measure.stop();
System.out.println("took " + measure);
}
private static <T> MeasuringSubscriber<T> measuringSubscriber(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
return new MeasuringSubscriber<>(onNext, onError, onComplete);
}
}
一个选项是创建一个 Observable 来输出时间。您可以通过使用 Observable#using
:
public class TimerExample {
public static void main(String[] args) {
final PublishSubject<Long> timings = PublishSubject.create();
final Observable<List<Integer>> list = Observable
.just(1, 2, 3)
.flatMap(TimerExample::longRunningComputation)
.toList();
final Observable<List<Integer>> timed
= Observable.using(() -> new Timer(timings), (t) -> list, Timer::time);
timings.subscribe(time -> System.out.println("Time: " + time + "ms"));
List<Integer> ints = timed.toBlocking().last();
System.out.println("ints: " + Joiner.on(", ").join(ints));
ints = timed.toBlocking().last();
System.out.println("ints: " + Joiner.on(", ").join(ints));
}
private static Observable<Integer> longRunningComputation(Integer i) {
return Observable.timer(1, TimeUnit.SECONDS).map(ignored -> i);
}
public static class Timer {
private final long startTime;
private final Observer<Long> timings;
public Timer(Observer<Long> timings) {
this.startTime = System.currentTimeMillis();
this.timings = timings;
}
public void time() {
timings.onNext(System.currentTimeMillis() - startTime);
}
}
}
在这种情况下,时间会打印到控制台,但您可以随意使用它们:
Time: 1089ms
ints: 2, 1, 3
Time: 1003ms
ints: 1, 3, 2