如何使用 Rxjs 将 Subject "next" 的值合并到主流中
How to merge the value of a Subject "next" into the main stream with Rxjs
方法 retryWhen returns a Subject,如何将它的下一个值与主流合并?
dataStream.
retryWhen(errors => {
return errors.
delay(1000).
take(3).
concat(Rx.Observable.throw(errors));
}).
subscribe(
x => console.log('onNext:', x),
e => {
e.subscribe(function (value) {
console.log('err', value);
});
},
_ => console.log('onCompleted'));
现在它可以工作,但我必须订阅错误处理程序中的 e 才能获取 dataStream 抛出的错误值。有可能简化这个吗?
更新:
我想编写一个 "data-access routine" 具有弹性的代码。目前它将以 1 秒的延迟重试 3 次,如果出现错误,它将抛出 "error"。问题是它不会抛出错误而是抛出 "Subject" 并且我必须订阅它才能得到错误。
let getPromiseStream = function (endpoint) {
return Rx.Observable.
just(endpoint).
flatMap(requestUrl => {
return _this.$http.get(requestUrl);
});
};
let itemsStream = getPromiseStream('/items');
let locksStream = getPromiseStream('/error');
let usersStream = getPromiseStream('/users');
let favsStream = getPromiseStream('/favs');
let dataStream = Rx.Observable.
zip(
itemsStream,
locksStream,
usersStream,
favsStream,
(items, locks, users, favs) => {
return {items: items.data, locks: locks.data, users: users.data, favs: favs.data};
});
retryWhen
可能有点难以使用(我不得不再次查找以记住它的行为)。本质上 retryWhen
会将您从源 Observable
收到的所有错误转换为 onNext
调用。您传入的函数接收 that Observable,然后让您处理通过的事件。如果你想继续重试,它应该简单地传递事件。如果它应该出错那么你需要将 onNext
转换成 onError
或者如果你只是想让它停止尝试那么你应该将它转换成 onCompleted
.
因此,在您的代码中,您希望延迟一定次数,然后如果它确实不仅仅是暂时性网络错误,则将整个错误都排除在外。
dataStream.
retryWhen(errors => {
return errors
.flatMap((err, count) => {
return count < 3 ?
//This will just get flattened out and passed through
Rx.Observable.just(err) :
//If you have received 3 errors it is time to throw
//This error will get passed all the way to the final
//onError method if it isn't caught along the way
Rx.Observable.throw(err);
})
.delay(1000);
})
.subscribe(
x => console.log('onNext:', x),
e => console.log('err', value),
_ => console.log('onCompleted'));
附加说明:我建议您将 retryWhen
放在馈入 zip
的每个 Observable
上,而不是放在整个东西上。在后一种情况下,一个失败将重试 all 来源 Observables
而不仅仅是失败的来源。
方法 retryWhen returns a Subject,如何将它的下一个值与主流合并?
dataStream.
retryWhen(errors => {
return errors.
delay(1000).
take(3).
concat(Rx.Observable.throw(errors));
}).
subscribe(
x => console.log('onNext:', x),
e => {
e.subscribe(function (value) {
console.log('err', value);
});
},
_ => console.log('onCompleted'));
现在它可以工作,但我必须订阅错误处理程序中的 e 才能获取 dataStream 抛出的错误值。有可能简化这个吗?
更新:
我想编写一个 "data-access routine" 具有弹性的代码。目前它将以 1 秒的延迟重试 3 次,如果出现错误,它将抛出 "error"。问题是它不会抛出错误而是抛出 "Subject" 并且我必须订阅它才能得到错误。
let getPromiseStream = function (endpoint) {
return Rx.Observable.
just(endpoint).
flatMap(requestUrl => {
return _this.$http.get(requestUrl);
});
};
let itemsStream = getPromiseStream('/items');
let locksStream = getPromiseStream('/error');
let usersStream = getPromiseStream('/users');
let favsStream = getPromiseStream('/favs');
let dataStream = Rx.Observable.
zip(
itemsStream,
locksStream,
usersStream,
favsStream,
(items, locks, users, favs) => {
return {items: items.data, locks: locks.data, users: users.data, favs: favs.data};
});
retryWhen
可能有点难以使用(我不得不再次查找以记住它的行为)。本质上 retryWhen
会将您从源 Observable
收到的所有错误转换为 onNext
调用。您传入的函数接收 that Observable,然后让您处理通过的事件。如果你想继续重试,它应该简单地传递事件。如果它应该出错那么你需要将 onNext
转换成 onError
或者如果你只是想让它停止尝试那么你应该将它转换成 onCompleted
.
因此,在您的代码中,您希望延迟一定次数,然后如果它确实不仅仅是暂时性网络错误,则将整个错误都排除在外。
dataStream.
retryWhen(errors => {
return errors
.flatMap((err, count) => {
return count < 3 ?
//This will just get flattened out and passed through
Rx.Observable.just(err) :
//If you have received 3 errors it is time to throw
//This error will get passed all the way to the final
//onError method if it isn't caught along the way
Rx.Observable.throw(err);
})
.delay(1000);
})
.subscribe(
x => console.log('onNext:', x),
e => console.log('err', value),
_ => console.log('onCompleted'));
附加说明:我建议您将 retryWhen
放在馈入 zip
的每个 Observable
上,而不是放在整个东西上。在后一种情况下,一个失败将重试 all 来源 Observables
而不仅仅是失败的来源。