使用 RxJS 操作项目流以获得流式项目数组
Using RxJS to manipulate a stream of items to obtain an array of streamed items
我对 rxjs 有点陌生,无法解决这个问题:
我有两个流:
- 一个有传入对象
- ---a----b----c----d----->
- 一个与从列表中选择的对象
- --------------------c---->
从传入的对象流中生成对象列表流(使用扫描运算符)
incoming: ----a--------b-------c----------d----------------\>
list: -------[a]----[a,b]----[a,b,c]----[a,b,c,d]---------\>
当列表对象被选中 (n) 时,开始新的流
- 新流的第一个值是切片列表的最后一个值 (list.slice(n))
incoming: ----a--------b-------c----------d--------------------e-------->
list: -------[a]----[a,b]----[a,b,c]----[a,b,c,d]--------->
selected object: ---------------------------------c------->
new stream of list: ------[c,d]-----[c,d,e]--->
选中对象时无法获取列表流的最后一个值,
为了更好地理解,做了一个弹珠图,
selectedObject$ = new BehaviorSubject(0);
incomingObjects$ = new Subject();
list$ = incomingObjects$.pipe(
scan((acc, val) => {
acc.push(val);
return acc;
}, [])
)
newList$ = selectedObject$.pipe(
withLastFrom(list$),
switchMap(([index,list])=> incomingObjects$.pipe(
scan((acc, val) => {
acc.push(val);
return acc;
}, list.slice(index))
))
)
如果我理解正确的话,你可以按照下面的方法来做。
关键是要认识到 list
Observable(即使用 scan
获得的 Observable)应该是 hot Observable,即一个 Observable 通知独立于它是否被订阅。原因是您要创建的每个新流都应该始终具有与其上游相同的源 Observable。
然后,正如您已经暗示的那样,select值的行为应该用 BehaviorSubject 建模。
一旦 select BehaviorSubject 通知一个值 selected,前一个流必须 complete
并且必须订阅一个新的。这是 switchMap
.
的工作
剩下的就是 slice
数字数组的正确方法。
这是该方法的完整代码
const selectedObject$ = new BehaviorSubject(1);
const incomingObjects$ = interval(1000).pipe(take(10));
const incomingObjectsHot$ = new ReplaySubject<number[]>(1);
incomingObjects$
.pipe(
scan((acc, val) => {
acc.push(val);
return acc;
}, [])
)
.subscribe(incomingObjectsHot$);
selectedObject$
.pipe(
switchMap((selected) =>
incomingObjectsHot$.pipe(
map((nums) => {
const selIndex = nums.indexOf(selected);
if (selIndex > 0) {
return nums.slice(selIndex);
}
})
)
),
filter(v => !!v)
)
.subscribe(console.log);
示例见this stackblitz。
我与 scan 运算符一起使用的一个常见模式是传递 reducer 函数而不是要扫描的值,以便可以在更新操作中使用当前值。在这种情况下,您可以 link 使用 merge 运算符的两个可观察对象,并将它们的值映射到适当的函数 - 添加到列表,或在选择后切片列表.
// these are just timers for demonstration, any observable should be fine.
const incoming$ = timer(1000, 1000).pipe(map(x => String.fromCharCode(x + 65)), take(10));
const selected$ = timer(3000, 3000).pipe(map(x => String.fromCharCode(x * 2 + 66)), take(2));
merge(
incoming$.pipe(map(x => (s) => [...s, x])), // append to list
selected$.pipe(map(x => (s) => { // slice list starting from selection
const index = s.indexOf(x);
return (index !== -1) ? s.slice(index) : s;
}))
).pipe(
scan((list, reducer) => reducer(list), []) // run reducer
).subscribe(x => console.log(x)); // display list state as demonstration.
我对 rxjs 有点陌生,无法解决这个问题:
我有两个流:
- 一个有传入对象
- ---a----b----c----d----->
- 一个与从列表中选择的对象
- --------------------c---->
从传入的对象流中生成对象列表流(使用扫描运算符)
incoming: ----a--------b-------c----------d----------------\>
list: -------[a]----[a,b]----[a,b,c]----[a,b,c,d]---------\>
当列表对象被选中 (n) 时,开始新的流
- 新流的第一个值是切片列表的最后一个值 (list.slice(n))
incoming: ----a--------b-------c----------d--------------------e-------->
list: -------[a]----[a,b]----[a,b,c]----[a,b,c,d]--------->
selected object: ---------------------------------c------->
new stream of list: ------[c,d]-----[c,d,e]--->
选中对象时无法获取列表流的最后一个值,
为了更好地理解,做了一个弹珠图,
selectedObject$ = new BehaviorSubject(0);
incomingObjects$ = new Subject();
list$ = incomingObjects$.pipe(
scan((acc, val) => {
acc.push(val);
return acc;
}, [])
)
newList$ = selectedObject$.pipe(
withLastFrom(list$),
switchMap(([index,list])=> incomingObjects$.pipe(
scan((acc, val) => {
acc.push(val);
return acc;
}, list.slice(index))
))
)
如果我理解正确的话,你可以按照下面的方法来做。
关键是要认识到 list
Observable(即使用 scan
获得的 Observable)应该是 hot Observable,即一个 Observable 通知独立于它是否被订阅。原因是您要创建的每个新流都应该始终具有与其上游相同的源 Observable。
然后,正如您已经暗示的那样,select值的行为应该用 BehaviorSubject 建模。
一旦 select BehaviorSubject 通知一个值 selected,前一个流必须 complete
并且必须订阅一个新的。这是 switchMap
.
剩下的就是 slice
数字数组的正确方法。
这是该方法的完整代码
const selectedObject$ = new BehaviorSubject(1);
const incomingObjects$ = interval(1000).pipe(take(10));
const incomingObjectsHot$ = new ReplaySubject<number[]>(1);
incomingObjects$
.pipe(
scan((acc, val) => {
acc.push(val);
return acc;
}, [])
)
.subscribe(incomingObjectsHot$);
selectedObject$
.pipe(
switchMap((selected) =>
incomingObjectsHot$.pipe(
map((nums) => {
const selIndex = nums.indexOf(selected);
if (selIndex > 0) {
return nums.slice(selIndex);
}
})
)
),
filter(v => !!v)
)
.subscribe(console.log);
示例见this stackblitz。
我与 scan 运算符一起使用的一个常见模式是传递 reducer 函数而不是要扫描的值,以便可以在更新操作中使用当前值。在这种情况下,您可以 link 使用 merge 运算符的两个可观察对象,并将它们的值映射到适当的函数 - 添加到列表,或在选择后切片列表.
// these are just timers for demonstration, any observable should be fine.
const incoming$ = timer(1000, 1000).pipe(map(x => String.fromCharCode(x + 65)), take(10));
const selected$ = timer(3000, 3000).pipe(map(x => String.fromCharCode(x * 2 + 66)), take(2));
merge(
incoming$.pipe(map(x => (s) => [...s, x])), // append to list
selected$.pipe(map(x => (s) => { // slice list starting from selection
const index = s.indexOf(x);
return (index !== -1) ? s.slice(index) : s;
}))
).pipe(
scan((list, reducer) => reducer(list), []) // run reducer
).subscribe(x => console.log(x)); // display list state as demonstration.