如何使用 Rxjs 将 Subject "next" 的值合并到主流中

How to merge the value of a Subject "next" into the main stream with Rxjs

方法 retryWhen returns a Subject,如何将它的下一个值与主流合并?

dataStream.
    retryWhen(errors => {
        return errors.
            delay(1000).
            take(3).
            concat(Rx.Observable.throw(errors));
    }).
    subscribe(
        x => console.log('onNext:', x),
        e => {
            e.subscribe(function (value) {
                console.log('err', value);
            });
        },
        _ => console.log('onCompleted'));

现在它可以工作,但我必须订阅错误处理程序中的 e 才能获取 dataStream 抛出的错误值。有可能简化这个吗?

更新:

我想编写一个 "data-access routine" 具有弹性的代码。目前它将以 1 秒的延迟重试 3 次,如果出现错误,它将抛出 "error"。问题是它不会抛出错误而是抛出 "Subject" 并且我必须订阅它才能得到错误。

let getPromiseStream = function (endpoint) {
    return Rx.Observable.
        just(endpoint).
        flatMap(requestUrl => {
            return _this.$http.get(requestUrl);
        });
};

let itemsStream = getPromiseStream('/items');
let locksStream = getPromiseStream('/error');
let usersStream = getPromiseStream('/users');
let favsStream = getPromiseStream('/favs');

let dataStream = Rx.Observable.
            zip(
            itemsStream,
            locksStream,
            usersStream,
            favsStream,
            (items, locks, users, favs) => {
                return {items: items.data, locks: locks.data, users: users.data, favs: favs.data};
            });

retryWhen 可能有点难以使用(我不得不再次查找以记住它的行为)。本质上 retryWhen 会将您从源 Observable 收到的所有错误转换为 onNext 调用。您传入的函数接收 that Observable,然后让您处理通过的事件。如果你想继续重试,它应该简单地传递事件。如果它应该出错那么你需要将 onNext 转换成 onError 或者如果你只是想让它停止尝试那么你应该将它转换成 onCompleted.

因此,在您的代码中,您希望延迟一定次数,然后如果它确实不仅仅是暂时性网络错误,则将整个错误都排除在外。

dataStream.
    retryWhen(errors => {
        return errors
            .flatMap((err, count) => {
              return count < 3 ? 
                //This will just get flattened out and passed through
                Rx.Observable.just(err) :
                //If you have received 3 errors it is time to throw
                //This error will get passed all the way to the final
                //onError method if it isn't caught along the way
                Rx.Observable.throw(err);
            })
            .delay(1000);
    })
    .subscribe(
        x => console.log('onNext:', x),
        e => console.log('err', value),
        _ => console.log('onCompleted'));

附加说明:我建议您将 retryWhen 放在馈入 zip 的每个 Observable 上,而不是放在整个东西上。在后一种情况下,一个失败将重试 all 来源 Observables 而不仅仅是失败的来源。