在 RxJava 中,如何 reflect/extract Observable 之外的失败?
In RxJava, how to reflect/extract a failure outside of an Observable?
我们有一个调用 update(key, content) 方法的 StoreService,它使用 couchbase 客户端来执行 get-->change_content-->replace。
作为该过程的一部分,我们使用 Observable retryWhen 来处理异常。如果重试超过最大重试次数,它只会传递异常,然后触发观察者的 onError 方法。
如果无法处理错误,我们想做的是从 update(key, content) 方法向调用它的 StoreService 抛出异常,但我们没有这样做。
目前我们尝试了以下方法均未成功:
- 从 onError 方法抛出异常,但它不会从 Observable 中抛出。
- 抛出一个 RuntimeException,但它也不起作用。
- 使用其中包含布尔 isFailed 成员的 DTO:我们在 Observable 外部创建 DTO,如果发生错误,我们将到达订阅者的 onError,我们将 DTO 的 isFailed 设置为 true。 Observable 完成后,我们检查 DTO 是否失败,如果是,我们抛出异常。这也不起作用 - onError 中发生的更改没有传播到 Observable 之外(为什么?)
伪代码如下:
public void update(String key, ConversationDto updateConversationDto) {
ObservableExecution observableExecution = new ObservableExecution();
Observable
.defer(... get the document from couchbase ...)
.map(... handle JSON conversion and update the document ...)
.flatMap(documentUpdate -> {
return couchbaseClient.getAsyncBucket().replace(documentUpdate);
})
.retryWhen(new RetryWithDelay(3, 200))
.subscribe(
n -> logger.debug("on next update document -> " + n.content()),
e -> {
//logger.error("failed to insert a document",e);
observableExecution.setFailure(e);
},
() -> logger.debug("on complete update document")
);
// this is never true
if (observableExecution.isFailed()) {
final Throwable e = observableExecution.getFailure();
throw new DalRuntimeException(e.getMessage(), e);
}
}
这是 retryWhen 代码:
public Observable<?> call(Observable<? extends Throwable> attempts) {
return attempts
.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable errorNotification) {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
logger.debug(errorNotification + " retry no. " + retryCount);
return Observable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}
// Max retries hit. Just pass the error along.
logger.debug(errorNotification + " exceeded max retries " + maxRetries);
return Observable.error(errorNotification);
}
});
}
非常感谢您的帮助!
订阅 运行s 异步,因此 isFailed()
检查将始终 运行 紧接在 e -> setFailure(e)
代码 运行s 之前。
正确的做法是 return 来自 update()
方法的 Observable
并在 StoreService
中订阅它。这样,当您有兴趣处理成功和失败时,您就会收到通知。
我同意@Ross:从概念上讲,Observable 应该由 update() 返回。
我可以建议的唯一简化是使用局部可变变量而不是 ObservableExecution DTO:
public void update(String key, ConversationDto updateConversationDto) {
final Throwable[] errorHolder = new Throwable[1];
Observable
.defer(... get the document from couchbase ...)
.map(... handle JSON conversion and update the document ...)
.flatMap(documentUpdate -> {
return couchbaseClient.getAsyncBucket().replace(documentUpdate);
})
.retryWhen(new RetryWithDelay(3, 200))
.subscribe(
n -> logger.debug("on next update document -> " + n.content()),
e -> {
//logger.error("failed to insert a document",e);
errorHolder[0] = e;
},
() -> logger.debug("on complete update document")
);
if (errorHolder[0] != null) {
final Throwable e = errorHolder[0];
throw new DalRuntimeException(e.getMessage(), e);
}
}
我们有一个调用 update(key, content) 方法的 StoreService,它使用 couchbase 客户端来执行 get-->change_content-->replace。
作为该过程的一部分,我们使用 Observable retryWhen 来处理异常。如果重试超过最大重试次数,它只会传递异常,然后触发观察者的 onError 方法。
如果无法处理错误,我们想做的是从 update(key, content) 方法向调用它的 StoreService 抛出异常,但我们没有这样做。
目前我们尝试了以下方法均未成功:
- 从 onError 方法抛出异常,但它不会从 Observable 中抛出。
- 抛出一个 RuntimeException,但它也不起作用。
- 使用其中包含布尔 isFailed 成员的 DTO:我们在 Observable 外部创建 DTO,如果发生错误,我们将到达订阅者的 onError,我们将 DTO 的 isFailed 设置为 true。 Observable 完成后,我们检查 DTO 是否失败,如果是,我们抛出异常。这也不起作用 - onError 中发生的更改没有传播到 Observable 之外(为什么?)
伪代码如下:
public void update(String key, ConversationDto updateConversationDto) {
ObservableExecution observableExecution = new ObservableExecution();
Observable
.defer(... get the document from couchbase ...)
.map(... handle JSON conversion and update the document ...)
.flatMap(documentUpdate -> {
return couchbaseClient.getAsyncBucket().replace(documentUpdate);
})
.retryWhen(new RetryWithDelay(3, 200))
.subscribe(
n -> logger.debug("on next update document -> " + n.content()),
e -> {
//logger.error("failed to insert a document",e);
observableExecution.setFailure(e);
},
() -> logger.debug("on complete update document")
);
// this is never true
if (observableExecution.isFailed()) {
final Throwable e = observableExecution.getFailure();
throw new DalRuntimeException(e.getMessage(), e);
}
}
这是 retryWhen 代码:
public Observable<?> call(Observable<? extends Throwable> attempts) {
return attempts
.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable errorNotification) {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
logger.debug(errorNotification + " retry no. " + retryCount);
return Observable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}
// Max retries hit. Just pass the error along.
logger.debug(errorNotification + " exceeded max retries " + maxRetries);
return Observable.error(errorNotification);
}
});
}
非常感谢您的帮助!
订阅 运行s 异步,因此 isFailed()
检查将始终 运行 紧接在 e -> setFailure(e)
代码 运行s 之前。
正确的做法是 return 来自 update()
方法的 Observable
并在 StoreService
中订阅它。这样,当您有兴趣处理成功和失败时,您就会收到通知。
我同意@Ross:从概念上讲,Observable 应该由 update() 返回。 我可以建议的唯一简化是使用局部可变变量而不是 ObservableExecution DTO:
public void update(String key, ConversationDto updateConversationDto) {
final Throwable[] errorHolder = new Throwable[1];
Observable
.defer(... get the document from couchbase ...)
.map(... handle JSON conversion and update the document ...)
.flatMap(documentUpdate -> {
return couchbaseClient.getAsyncBucket().replace(documentUpdate);
})
.retryWhen(new RetryWithDelay(3, 200))
.subscribe(
n -> logger.debug("on next update document -> " + n.content()),
e -> {
//logger.error("failed to insert a document",e);
errorHolder[0] = e;
},
() -> logger.debug("on complete update document")
);
if (errorHolder[0] != null) {
final Throwable e = errorHolder[0];
throw new DalRuntimeException(e.getMessage(), e);
}
}