RxJS - 如何将 toArray() 与异步可观察对象数组一起使用?
RxJS - How to use toArray() with an array of asynchronous observables?
我正在使用 Rx.Observable.create()
创建一个异步可观察对象数组,并希望在它们完成时使用 .toArray()
获取所有值。
console.log('running');
let valsArray = ['a','b','c'].map((val,i)=>{
return Rx.Observable.create((obs)=>{
let tid = setTimeout(()=>{
console.log(val + ' timing out');
obs.onNext(val);
},i*500);
return ()=>{
clearTimeout(tid);
};
}).publish().refCount();
});
Rx.Observable.from(valsArray)
.flatMap((v)=>v)
.toArray()
.subscribe((arr)=>{
console.log("arr should be ['a','b','c']",arr);
});
以上示例位于 http://jsbin.com/wegoha/10/edit?js,console。
使用 setTimeout
作为其他异步操作的替代以保持示例简单。
除了您没有完成源可观察量之外,代码是正确的。
toArray()
运算符只能在 observable 完成时工作,并且由于您没有完成 Rx.Observable.create
那么您的查询永远不会结束。
试试这个:
console.log('running');
let valsArray = ['a','b','c'].map((val,i)=>{
return Rx.Observable.create((obs)=>{
let tid = setTimeout(()=>{
console.log(val + ' timing out');
obs.onNext(val);
obs.onCompleted();
},i*500);
return ()=>{
clearTimeout(tid);
};
}).publish().refCount();
});
Rx.Observable.from(valsArray)
.flatMap((v)=>v)
.toArray()
.subscribe((arr)=>{
console.log("arr should be ['a','b','c']",arr);
});
此外,作为旁注,.publish().refCount()
在这里似乎是错误的。这段代码中没有必要使源可观察量变热。
我正在使用 Rx.Observable.create()
创建一个异步可观察对象数组,并希望在它们完成时使用 .toArray()
获取所有值。
console.log('running');
let valsArray = ['a','b','c'].map((val,i)=>{
return Rx.Observable.create((obs)=>{
let tid = setTimeout(()=>{
console.log(val + ' timing out');
obs.onNext(val);
},i*500);
return ()=>{
clearTimeout(tid);
};
}).publish().refCount();
});
Rx.Observable.from(valsArray)
.flatMap((v)=>v)
.toArray()
.subscribe((arr)=>{
console.log("arr should be ['a','b','c']",arr);
});
以上示例位于 http://jsbin.com/wegoha/10/edit?js,console。
使用 setTimeout
作为其他异步操作的替代以保持示例简单。
除了您没有完成源可观察量之外,代码是正确的。
toArray()
运算符只能在 observable 完成时工作,并且由于您没有完成 Rx.Observable.create
那么您的查询永远不会结束。
试试这个:
console.log('running');
let valsArray = ['a','b','c'].map((val,i)=>{
return Rx.Observable.create((obs)=>{
let tid = setTimeout(()=>{
console.log(val + ' timing out');
obs.onNext(val);
obs.onCompleted();
},i*500);
return ()=>{
clearTimeout(tid);
};
}).publish().refCount();
});
Rx.Observable.from(valsArray)
.flatMap((v)=>v)
.toArray()
.subscribe((arr)=>{
console.log("arr should be ['a','b','c']",arr);
});
此外,作为旁注,.publish().refCount()
在这里似乎是错误的。这段代码中没有必要使源可观察量变热。