RxJS 捕获 **并** 重试 Observable

RxJS catch **and** retry an Observable

我的用例是将 Observable 映射到成功和失败的 redux 操作。我进行网络调用(具有提供承诺的功能),如果成功,我必须转发成功操作,如果失败则不是错误操作。 Observable 本身将继续运行。对于我所能搜索到的所有内容,RxJS 没有一种机制可以捕获错误并重试原始错误。我的代码中有以下我不满意的解决方案:

error$ = new Rx.Subject();

searchResultAction$ = search$
    .flatMap(getSearchResultsPromise)
    .map((resuls) => {
        return {
            type: 'SUCCESS_ACTION',
            payload: {
                results
            }
        }
    })
    .retryWhen((err$) => {
        return err$
            .pluck('query')
            .do(error$.onNext.bind(error$));
    });

searchErrorAction$
    .map((query) => {
        return {
            type: 'ERROR_ACTION',
            payload: {
                query,
                message: 'Error while retrieving data'
            }
        }
    });

action$ = Observable
    .merge(
        searchResultAction$,
        searchErrorAction$
    )
    .doOnError(err => console.error('Ignored error: ', err))
    .retry();

action$.subscribe(dispatch);

即我创建了一个主题,并将错误推送到该主题中,并从中创建了一个错误操作的 Observable。

有没有更好的替代方案来替代我所缺少的在 RxJS 中执行此操作?基本上我想发出发生错误的通知,然后继续 Observable 已经在做的事情。

这将重试失败的查询:

var action$ = search$
    .flatMap(value => {
        // create an observable that will execute
        // the query each time it is subscribed
        const query = Rx.Observable.defer(() => getSearchResultsPromise(value));

        // add a retry operation to this query
        return query.retryWhen(errors$ => errors$.do(err => {
            console.log("ignoring error: ", err);
        }));
    })
    .map(payload => ({ type: "SUCCESS_ACTION", payload }));

action$.subscribe(dispatcher);

如果不想重试,只想通知或忽略错误:

var action$ = search$
    .flatMap(value => {
        // create an observable that will execute
        // the query each time it is subscribed
        const query = Rx.Observable.defer(() => getSearchResultsPromise(value));

        // add a catch clause to "ignore" the error
        return query.catch(err => {
            console.log("ignoring error: ", err);
            return Observable.empty(); // no result for this query
        }));
    })
    .map(payload => ({ type: "SUCCESS_ACTION", payload }));

action$.subscribe(dispatcher);