RxJava:在没有完成/取消订阅的情况下调用 onError
RxJava: calling onError without finishing / unsubscribing
我有以下代码 (*),它使用递归调用提供的可观察对象的调度程序实现轮询。
(*) 灵感来自 https://github.com/ReactiveX/RxJava/issues/448
当我只将 onNext
事件传递给订阅者时,它工作正常。但是当我将 onError
事件传递给订阅者时,会调用取消订阅事件,这反过来会终止调度程序。
我还想将错误传递给订阅者。有什么想法可以实现吗?
public Observable<Status> observe() {
return Observable.create(new PollingSubscriberAction<>(service.getStatusObservable(), 5, TimeUnit.SECONDS));
}
private class PollingSubscriberAction<T> implements Observable.OnSubscribe<T> {
private Subscription subscription;
private Subscription innerSubscription;
private Scheduler.Worker worker = Schedulers.newThread().createWorker();
private Observable<T> observable;
private long delayTime;
private TimeUnit unit;
public PollingSubscriberAction(final Observable<T> observable, long delayTime, TimeUnit unit) {
this.observable = observable;
this.delayTime = delayTime;
this.unit = unit;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
subscription = worker.schedule(new Action0() {
@Override
public void call() {
schedule(subscriber, true);
}
});
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
subscription.unsubscribe();
if (innerSubscription != null) {
innerSubscription.unsubscribe();
}
}
}));
}
private void schedule(final Subscriber<? super T> subscriber, boolean immediately) {
long delayTime = immediately ? 0 : this.delayTime;
subscription = worker.schedule(createInnerAction(subscriber), delayTime, unit);
}
private Action0 createInnerAction(final Subscriber<? super T> subscriber) {
return new Action0() {
@Override
public void call() {
innerSubscription = observable.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
schedule(subscriber, false);
}
@Override
public void onError(Throwable e) {
// Doesn't work.
// subscriber.onError(e);
schedule(subscriber, false);
}
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
});
}
};
}
}
onError 和 onCompleted 都是终止事件,这意味着您的 Observable 不会在任何事件发生后发出任何新事件。为了 swallow/handle 错误情况,请参阅错误运算符 - https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators. Also, in order to implement polling you might take advantage of this one - http://reactivex.io/documentation/operators/interval.html
所以我玩这个已经有一段时间了,我不认为你这样做是可能的。调用 onError
或 onCompleted
终止流,翻转 SafeSubscriber
包装器中的 done
标志,只是没有办法重置它。
我可以看到 2 个可用选项 - 我认为都不是特别优雅,但可以使用。
1 - UnsafeSubscribe
。可能不是最好的主意,但它确实有效,因为它不是将 Subscriber
包装在 SafeSubscriber
中,而是直接调用它。最好阅读 Javadoc 看看这是否适合您。或者,如果您喜欢冒险,请编写自己的 SafeSubscriber
,您可以在其中重置完成标志或类似标志。以你的例子,像这样调用:
observe.unsafeSubscribe(...)
2 - 实现类似于 this example 的东西。我很欣赏它在 C# 中,但它应该是可读的。简单地说 - 您想要创建一个 Pair<T, Exception>
class,然后不是调用 onError
,而是调用 onNext
并设置您的对的异常端。您的订阅者必须更聪明地检查一对的每一侧,并且您可能需要在源 Observable
和 Observable<Pair<T, Exception>>
之间进行一些数据转换,但我看不到为什么它不起作用。
如果有人有的话,我真的很想看看另一种方法。
希望这对您有所帮助,
会
正如@Will 指出的那样,您不能在不终止可观察对象的情况下直接调用 onError
。由于您只能调用 onNext
,我决定使用 Notification 将值和可抛出对象包装在一个对象中。
import rx.*;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;
import java.util.concurrent.TimeUnit;
public class PollingObservable {
public static <T> Observable<Notification<T>> create(Observable<T> observable, long delayTime, TimeUnit unit) {
return Observable.create(new OnSubscribePolling<>(observable, delayTime, unit));
}
private static class OnSubscribePolling<T> implements Observable.OnSubscribe<Notification<T>> {
private Subscription subscription;
private Subscription innerSubscription;
private Scheduler.Worker worker = Schedulers.newThread().createWorker();
private Observable<T> observable;
private long delayTime;
private TimeUnit unit;
private boolean isUnsubscribed = false;
public OnSubscribePolling(final Observable<T> observable, long delayTime, TimeUnit unit) {
this.observable = observable;
this.delayTime = delayTime;
this.unit = unit;
}
@Override
public void call(final Subscriber<? super Notification<T>> subscriber) {
subscription = worker.schedule(new Action0() {
@Override
public void call() {
schedule(subscriber, true);
}
});
subscriber.onStart();
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
isUnsubscribed = true;
subscription.unsubscribe();
if (innerSubscription != null) {
innerSubscription.unsubscribe();
}
}
}));
}
private void schedule(final Subscriber<? super Notification<T>> subscriber, boolean immediately) {
if (isUnsubscribed) {
return;
}
long delayTime = immediately ? 0 : this.delayTime;
subscription = worker.schedule(createInnerAction(subscriber), delayTime, unit);
}
private Action0 createInnerAction(final Subscriber<? super Notification<T>> subscriber) {
return new Action0() {
@Override
public void call() {
innerSubscription = observable.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
schedule(subscriber, false);
}
@Override
public void onError(Throwable e) {
subscriber.onNext(Notification.<T>createOnError(e));
schedule(subscriber, false);
}
@Override
public void onNext(T t) {
subscriber.onNext(Notification.createOnNext(t));
}
});
}
};
}
}
}
要使用它,您可以直接使用通知:
PollingObservable.create(service.getStatus(), 5, TimeUnit.SECONDS)
.subscribe(new Action1<Notification<Status>>() {
@Override
public void call(Notification<Status> notification) {
switch (notification.getKind()) {
case OnNext:
Status status = notification.getValue();
// handle onNext event
break;
case OnError:
Throwable throwable = notification.getThrowable();
// handle onError event
break;
}
}
});
或者您可以在通知上使用 accept 方法来使用常规 Observable:
PollingObservable.create(service.getStatus(), 5, TimeUnit.SECONDS)
.subscribe(new Action1<Notification<Status>>() {
@Override
public void call(Notification<Status> notification) {
notification.accept(statusObserver);
}
});
Observer<Status> statusObserver = new Observer<Status>() {
// ...
}
更新 2015-02-24
似乎轮询 observable 有时无法正常工作,因为内部 observable 即使在取消订阅后也会调用 onComplete
或 onError
,从而重新安排自己。我添加了 isUnsubscribed
标志以防止这种情况发生。
我有以下代码 (*),它使用递归调用提供的可观察对象的调度程序实现轮询。
(*) 灵感来自 https://github.com/ReactiveX/RxJava/issues/448
当我只将 onNext
事件传递给订阅者时,它工作正常。但是当我将 onError
事件传递给订阅者时,会调用取消订阅事件,这反过来会终止调度程序。
我还想将错误传递给订阅者。有什么想法可以实现吗?
public Observable<Status> observe() {
return Observable.create(new PollingSubscriberAction<>(service.getStatusObservable(), 5, TimeUnit.SECONDS));
}
private class PollingSubscriberAction<T> implements Observable.OnSubscribe<T> {
private Subscription subscription;
private Subscription innerSubscription;
private Scheduler.Worker worker = Schedulers.newThread().createWorker();
private Observable<T> observable;
private long delayTime;
private TimeUnit unit;
public PollingSubscriberAction(final Observable<T> observable, long delayTime, TimeUnit unit) {
this.observable = observable;
this.delayTime = delayTime;
this.unit = unit;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
subscription = worker.schedule(new Action0() {
@Override
public void call() {
schedule(subscriber, true);
}
});
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
subscription.unsubscribe();
if (innerSubscription != null) {
innerSubscription.unsubscribe();
}
}
}));
}
private void schedule(final Subscriber<? super T> subscriber, boolean immediately) {
long delayTime = immediately ? 0 : this.delayTime;
subscription = worker.schedule(createInnerAction(subscriber), delayTime, unit);
}
private Action0 createInnerAction(final Subscriber<? super T> subscriber) {
return new Action0() {
@Override
public void call() {
innerSubscription = observable.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
schedule(subscriber, false);
}
@Override
public void onError(Throwable e) {
// Doesn't work.
// subscriber.onError(e);
schedule(subscriber, false);
}
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
});
}
};
}
}
onError 和 onCompleted 都是终止事件,这意味着您的 Observable 不会在任何事件发生后发出任何新事件。为了 swallow/handle 错误情况,请参阅错误运算符 - https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators. Also, in order to implement polling you might take advantage of this one - http://reactivex.io/documentation/operators/interval.html
所以我玩这个已经有一段时间了,我不认为你这样做是可能的。调用 onError
或 onCompleted
终止流,翻转 SafeSubscriber
包装器中的 done
标志,只是没有办法重置它。
我可以看到 2 个可用选项 - 我认为都不是特别优雅,但可以使用。
1 - UnsafeSubscribe
。可能不是最好的主意,但它确实有效,因为它不是将 Subscriber
包装在 SafeSubscriber
中,而是直接调用它。最好阅读 Javadoc 看看这是否适合您。或者,如果您喜欢冒险,请编写自己的 SafeSubscriber
,您可以在其中重置完成标志或类似标志。以你的例子,像这样调用:
observe.unsafeSubscribe(...)
2 - 实现类似于 this example 的东西。我很欣赏它在 C# 中,但它应该是可读的。简单地说 - 您想要创建一个 Pair<T, Exception>
class,然后不是调用 onError
,而是调用 onNext
并设置您的对的异常端。您的订阅者必须更聪明地检查一对的每一侧,并且您可能需要在源 Observable
和 Observable<Pair<T, Exception>>
之间进行一些数据转换,但我看不到为什么它不起作用。
如果有人有的话,我真的很想看看另一种方法。
希望这对您有所帮助,
会
正如@Will 指出的那样,您不能在不终止可观察对象的情况下直接调用 onError
。由于您只能调用 onNext
,我决定使用 Notification 将值和可抛出对象包装在一个对象中。
import rx.*;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;
import java.util.concurrent.TimeUnit;
public class PollingObservable {
public static <T> Observable<Notification<T>> create(Observable<T> observable, long delayTime, TimeUnit unit) {
return Observable.create(new OnSubscribePolling<>(observable, delayTime, unit));
}
private static class OnSubscribePolling<T> implements Observable.OnSubscribe<Notification<T>> {
private Subscription subscription;
private Subscription innerSubscription;
private Scheduler.Worker worker = Schedulers.newThread().createWorker();
private Observable<T> observable;
private long delayTime;
private TimeUnit unit;
private boolean isUnsubscribed = false;
public OnSubscribePolling(final Observable<T> observable, long delayTime, TimeUnit unit) {
this.observable = observable;
this.delayTime = delayTime;
this.unit = unit;
}
@Override
public void call(final Subscriber<? super Notification<T>> subscriber) {
subscription = worker.schedule(new Action0() {
@Override
public void call() {
schedule(subscriber, true);
}
});
subscriber.onStart();
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
isUnsubscribed = true;
subscription.unsubscribe();
if (innerSubscription != null) {
innerSubscription.unsubscribe();
}
}
}));
}
private void schedule(final Subscriber<? super Notification<T>> subscriber, boolean immediately) {
if (isUnsubscribed) {
return;
}
long delayTime = immediately ? 0 : this.delayTime;
subscription = worker.schedule(createInnerAction(subscriber), delayTime, unit);
}
private Action0 createInnerAction(final Subscriber<? super Notification<T>> subscriber) {
return new Action0() {
@Override
public void call() {
innerSubscription = observable.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
schedule(subscriber, false);
}
@Override
public void onError(Throwable e) {
subscriber.onNext(Notification.<T>createOnError(e));
schedule(subscriber, false);
}
@Override
public void onNext(T t) {
subscriber.onNext(Notification.createOnNext(t));
}
});
}
};
}
}
}
要使用它,您可以直接使用通知:
PollingObservable.create(service.getStatus(), 5, TimeUnit.SECONDS)
.subscribe(new Action1<Notification<Status>>() {
@Override
public void call(Notification<Status> notification) {
switch (notification.getKind()) {
case OnNext:
Status status = notification.getValue();
// handle onNext event
break;
case OnError:
Throwable throwable = notification.getThrowable();
// handle onError event
break;
}
}
});
或者您可以在通知上使用 accept 方法来使用常规 Observable:
PollingObservable.create(service.getStatus(), 5, TimeUnit.SECONDS)
.subscribe(new Action1<Notification<Status>>() {
@Override
public void call(Notification<Status> notification) {
notification.accept(statusObserver);
}
});
Observer<Status> statusObserver = new Observer<Status>() {
// ...
}
更新 2015-02-24
似乎轮询 observable 有时无法正常工作,因为内部 observable 即使在取消订阅后也会调用 onComplete
或 onError
,从而重新安排自己。我添加了 isUnsubscribed
标志以防止这种情况发生。