RxJs:如何根据可观察对象的状态进行循环?
RxJs: How to loop based on state of the observable?
我试图让 RxJs 循环遍历流中的 Observable 直到它处于特定状态,然后让流继续。具体来说,我正在将同步 do/while 循环转换为 RxJs,但我假设相同的答案也可以用于 for 或 while 循环。
我以为我可以为此使用 doWhile(),但条件函数似乎无法访问流中的项目,这似乎违背了我的目的。
我不完全确定正确的响应式术语是什么,但这是我想要的一个例子:
var source = new Rx.Observable.of({val: 0, counter: 3});
source.map(o => {
o.counter--;
console.log('Counter: ' + o.counter);
if (!o.counter) {
o.val = "YESS!";
}
return o;
})
.doWhile(o => {
return o.counter > 0;
})
.subscribe(
function (x) {
console.log('Next: ' + x.val);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
预期输出为:
Counter: 3
Counter: 2
Counter: 1
Counter: 0
Next: YESS!
Completed
假设这是一个可以解决的问题,我不清楚你如何在循环时标记 'start' 你想要 return 的地方。
有expand operator which gets you close by allowing you to recursively call a selector function. Returning an empty observable would be your break in that case. See jsbin:
var source = Rx.Observable.return({val: 0, counter: 3})
.expand(value => {
if(!value.counter) return Rx.Observable.empty();
value.counter -= 1;
if(!value.counter) value.val = 'YESS';
return Rx.Observable.return(value)
})
.subscribe(value => console.log(value.counter ?
'Counter: ' + value.counter :
'Next: ' + value.val));
不完全是你想要的,但关闭,使用 expand
运算符,并用 Rx.Observable.empty
(http://jsfiddle.net/naaycu71/3/):
发出递归结束信号
var source = new Rx.Observable.of({val: 0, counter: 3});
source.expand(function(o) {
console.log('Counter: ' + o.counter);
o.counter--;
return (o.counter >= 0) ? Rx.Observable.just(o) : Rx.Observable.empty()
})
.subscribe(
function (x) {
console.log('Next: ' , x);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
输出:
Next: Object {val: 0, counter: 3}
Counter: 3
Next: Object {val: 0, counter: 2}
Counter: 2
Next: Object {val: 0, counter: 1}
Counter: 1
Next: Object {val: 0, counter: 0}
Counter: 0
Completed
我试图让 RxJs 循环遍历流中的 Observable 直到它处于特定状态,然后让流继续。具体来说,我正在将同步 do/while 循环转换为 RxJs,但我假设相同的答案也可以用于 for 或 while 循环。
我以为我可以为此使用 doWhile(),但条件函数似乎无法访问流中的项目,这似乎违背了我的目的。
我不完全确定正确的响应式术语是什么,但这是我想要的一个例子:
var source = new Rx.Observable.of({val: 0, counter: 3});
source.map(o => {
o.counter--;
console.log('Counter: ' + o.counter);
if (!o.counter) {
o.val = "YESS!";
}
return o;
})
.doWhile(o => {
return o.counter > 0;
})
.subscribe(
function (x) {
console.log('Next: ' + x.val);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
预期输出为:
Counter: 3
Counter: 2
Counter: 1
Counter: 0
Next: YESS!
Completed
假设这是一个可以解决的问题,我不清楚你如何在循环时标记 'start' 你想要 return 的地方。
有expand operator which gets you close by allowing you to recursively call a selector function. Returning an empty observable would be your break in that case. See jsbin:
var source = Rx.Observable.return({val: 0, counter: 3})
.expand(value => {
if(!value.counter) return Rx.Observable.empty();
value.counter -= 1;
if(!value.counter) value.val = 'YESS';
return Rx.Observable.return(value)
})
.subscribe(value => console.log(value.counter ?
'Counter: ' + value.counter :
'Next: ' + value.val));
不完全是你想要的,但关闭,使用 expand
运算符,并用 Rx.Observable.empty
(http://jsfiddle.net/naaycu71/3/):
var source = new Rx.Observable.of({val: 0, counter: 3});
source.expand(function(o) {
console.log('Counter: ' + o.counter);
o.counter--;
return (o.counter >= 0) ? Rx.Observable.just(o) : Rx.Observable.empty()
})
.subscribe(
function (x) {
console.log('Next: ' , x);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
输出:
Next: Object {val: 0, counter: 3}
Counter: 3
Next: Object {val: 0, counter: 2}
Counter: 2
Next: Object {val: 0, counter: 1}
Counter: 1
Next: Object {val: 0, counter: 0}
Counter: 0
Completed