我可以跟踪 RxJava 订阅者中的事件消耗吗?
Can I trace consuming of events in RxJava Subscriber?
我想跟踪订阅者何时开始使用事件以及何时完成。
有没有适用于所有 Observables/Subscribers 的通用方法?
是 1.x 还是 2.x?对于 2.x 它可能会变得非常复杂,因为必须考虑所有内部协议,以免意外取消优化您的流程。
否则,它可以像写一个 Observer
一样简单,在真实的 Observer
和运算符之间填充:
import io.reactivex.Observer;
RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
if (!observable.getClass().getName().toLowerCase().contains("map")) {
return observer;
}
System.out.println("Started");
class SignalTracker implements Observer<Object>, Disposable {
Disposable upstream;
@Override public void onSubscribe(Disposable d) {
upstream = d;
// write the code here that has to react to establishing the subscription
observer.onSubscribe(this);
}
@Override public void onNext(Object o) {
// handle onNext before or aftern notifying the downstream
observer.onNext(o);
}
@Override public void onError(Throwable t) {
// handle onError
observer.onError(t);
}
@Override public void onComplete() {
// handle onComplete
System.out.println("Completed");
observer.onComplete();
}
@Override public void dispose() {
// handle dispose
upstream.dispose();
}
@Override public boolean isDisposed() {
return upstream.isDisposed();
}
}
return new SignalTracker();
});
Observable<Integer> observable = Observable.range(1, 5)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.map(integer -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 3;
});
observable.subscribe(System.out::println);
Thread.sleep(6000L);
打印:
Started
3
6
9
12
15
Completed
编辑: RxJava 1 版本需要更多的 lambda,但可行:
RxJavaHooks.setOnObservableStart((observable, onSubscribe) -> {
if (!onSubscribe.getClass().getName().toLowerCase().contains("map")) {
return onSubscribe;
}
System.out.println("Started");
return (Observable.OnSubscribe<Object>)observer -> {
class SignalTracker extends Subscriber<Object> {
@Override public void onNext(Object o) {
// handle onNext before or aftern notifying the downstream
observer.onNext(o);
}
@Override public void onError(Throwable t) {
// handle onError
observer.onError(t);
}
@Override public void onCompleted() {
// handle onComplete
System.out.println("Completed");
observer.onCompleted();
}
@Override public void setProducer(Producer p) {
observer.setProducer(p);
}
}
SignalTracker t = new SignalTracker()
observer.add(t);
onSubscribe.call(t);
};
});
Observable<Integer> observable = Observable.range(1, 5)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.map(integer -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 3;
});
observable.subscribe(System.out::println);
Thread.sleep(6000L);
我想跟踪订阅者何时开始使用事件以及何时完成。 有没有适用于所有 Observables/Subscribers 的通用方法?
是 1.x 还是 2.x?对于 2.x 它可能会变得非常复杂,因为必须考虑所有内部协议,以免意外取消优化您的流程。
否则,它可以像写一个 Observer
一样简单,在真实的 Observer
和运算符之间填充:
import io.reactivex.Observer;
RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
if (!observable.getClass().getName().toLowerCase().contains("map")) {
return observer;
}
System.out.println("Started");
class SignalTracker implements Observer<Object>, Disposable {
Disposable upstream;
@Override public void onSubscribe(Disposable d) {
upstream = d;
// write the code here that has to react to establishing the subscription
observer.onSubscribe(this);
}
@Override public void onNext(Object o) {
// handle onNext before or aftern notifying the downstream
observer.onNext(o);
}
@Override public void onError(Throwable t) {
// handle onError
observer.onError(t);
}
@Override public void onComplete() {
// handle onComplete
System.out.println("Completed");
observer.onComplete();
}
@Override public void dispose() {
// handle dispose
upstream.dispose();
}
@Override public boolean isDisposed() {
return upstream.isDisposed();
}
}
return new SignalTracker();
});
Observable<Integer> observable = Observable.range(1, 5)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.map(integer -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 3;
});
observable.subscribe(System.out::println);
Thread.sleep(6000L);
打印:
Started
3
6
9
12
15
Completed
编辑: RxJava 1 版本需要更多的 lambda,但可行:
RxJavaHooks.setOnObservableStart((observable, onSubscribe) -> {
if (!onSubscribe.getClass().getName().toLowerCase().contains("map")) {
return onSubscribe;
}
System.out.println("Started");
return (Observable.OnSubscribe<Object>)observer -> {
class SignalTracker extends Subscriber<Object> {
@Override public void onNext(Object o) {
// handle onNext before or aftern notifying the downstream
observer.onNext(o);
}
@Override public void onError(Throwable t) {
// handle onError
observer.onError(t);
}
@Override public void onCompleted() {
// handle onComplete
System.out.println("Completed");
observer.onCompleted();
}
@Override public void setProducer(Producer p) {
observer.setProducer(p);
}
}
SignalTracker t = new SignalTracker()
observer.add(t);
onSubscribe.call(t);
};
});
Observable<Integer> observable = Observable.range(1, 5)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.map(integer -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 3;
});
observable.subscribe(System.out::println);
Thread.sleep(6000L);