是否存在仅传播错误而不终止自身的可观察对象?
Is there an observable that just propagates the error without terminating itself?
我在 class 中使用 PublishSubject 负责同步。同步完成后,将通知所有订阅者。发生错误时也会发生同样的情况。
我注意到,下次我在发生错误后订阅时,它会立即 return 到订阅者。
所以 class 可能看起来像这样:
public class Synchronizer {
private final PublishSubject<Result> mSyncHeadObservable = PublishSubject.create();
private final ThreadPoolExecutor mExecutor = new ThreadPoolExecutor(1, 1,
10, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(true),
new ThreadPoolExecutor.DiscardPolicy());
public Observable<Result> syncHead(final int chunkSize) {
mExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//Do some work which either returns a result or throws an error
//...
mSyncHeadObservable.onNext(Notification.createOnNext(/*some result*/));
} catch (Throwable error) {
mSyncHeadObservable.onError(Notification.<Result>createOnError(error));
}
}
});
有没有可以作为代理的observable?可能是其他 Rx 方法?
更新:
我遵循了@akarnokd 的方法并发出了包装到 RxJava Notification
中的事件。然后通过 flatMap()
打开它们。所以 Synchronizer
class 的客户不需要这样做。
//...
private PublishSubject<Notification<Result>> mSyncHeadObservable = PublishSubject.create();
public Observable<Result> syncHead(final int chunkSize) {
return mSyncHeadObservable.flatMap(new Func1<Notification<Result>, Observable<Result>>() {
@Override
public Observable<Result> call(Notification<Result> result) {
if (result.isOnError()) {
return Observable.error(result.getThrowable());
}
return Observable.just(result.getValue());
}
}).doOnSubscribe(
new Action0() {
@Override
public void call() {
startHeadSync(chunkSize);
}
});
}
private void startHeadSync(final int chunkSize) {
mExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//Do some work which either returns a result or throws an error
//...
mSyncHeadObservable.onNext(Notification.createOnNext(/*some result*/));
} catch (Throwable error) {
mSyncHeadObservable.onError(Notification.<Result>createOnError(error));
}
}
});
}
//...
发生错误时,可观察对象达到终止状态。
如果你想继续观察它,你应该重新订阅你的observable with retry operator 或者使用另一个错误handling operators
我不确定你想用这个设置实现什么,但一般来说,为了避免 PublishSubject
的终止条件,你应该将你的值和错误包装到一个通用结构中并始终发出那些,从来没有 onError
和 onCompleted
。一种选择是使用 RxJava 自己的事件包装器,Notification
,你的 Subscriber
s 应该解包值。
我在 class 中使用 PublishSubject 负责同步。同步完成后,将通知所有订阅者。发生错误时也会发生同样的情况。 我注意到,下次我在发生错误后订阅时,它会立即 return 到订阅者。
所以 class 可能看起来像这样:
public class Synchronizer {
private final PublishSubject<Result> mSyncHeadObservable = PublishSubject.create();
private final ThreadPoolExecutor mExecutor = new ThreadPoolExecutor(1, 1,
10, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(true),
new ThreadPoolExecutor.DiscardPolicy());
public Observable<Result> syncHead(final int chunkSize) {
mExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//Do some work which either returns a result or throws an error
//...
mSyncHeadObservable.onNext(Notification.createOnNext(/*some result*/));
} catch (Throwable error) {
mSyncHeadObservable.onError(Notification.<Result>createOnError(error));
}
}
});
有没有可以作为代理的observable?可能是其他 Rx 方法?
更新:
我遵循了@akarnokd 的方法并发出了包装到 RxJava Notification
中的事件。然后通过 flatMap()
打开它们。所以 Synchronizer
class 的客户不需要这样做。
//...
private PublishSubject<Notification<Result>> mSyncHeadObservable = PublishSubject.create();
public Observable<Result> syncHead(final int chunkSize) {
return mSyncHeadObservable.flatMap(new Func1<Notification<Result>, Observable<Result>>() {
@Override
public Observable<Result> call(Notification<Result> result) {
if (result.isOnError()) {
return Observable.error(result.getThrowable());
}
return Observable.just(result.getValue());
}
}).doOnSubscribe(
new Action0() {
@Override
public void call() {
startHeadSync(chunkSize);
}
});
}
private void startHeadSync(final int chunkSize) {
mExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//Do some work which either returns a result or throws an error
//...
mSyncHeadObservable.onNext(Notification.createOnNext(/*some result*/));
} catch (Throwable error) {
mSyncHeadObservable.onError(Notification.<Result>createOnError(error));
}
}
});
}
//...
发生错误时,可观察对象达到终止状态。
如果你想继续观察它,你应该重新订阅你的observable with retry operator 或者使用另一个错误handling operators
我不确定你想用这个设置实现什么,但一般来说,为了避免 PublishSubject
的终止条件,你应该将你的值和错误包装到一个通用结构中并始终发出那些,从来没有 onError
和 onCompleted
。一种选择是使用 RxJava 自己的事件包装器,Notification
,你的 Subscriber
s 应该解包值。