使用 RxJS 操作项目流以获得流式项目数组

Using RxJS to manipulate a stream of items to obtain an array of streamed items

我对 rxjs 有点陌生,无法解决这个问题:

我有两个流:

从传入的对象流中生成对象列表流(使用扫描运算符)

  incoming: ----a--------b-------c----------d----------------\>
  list:  -------[a]----[a,b]----[a,b,c]----[a,b,c,d]---------\>

当列表对象被选中 (n) 时,开始新的流

incoming: ----a--------b-------c----------d--------------------e-------->
list:  -------[a]----[a,b]----[a,b,c]----[a,b,c,d]--------->
selected object:  ---------------------------------c------->

new stream of list:                           ------[c,d]-----[c,d,e]--->

选中对象时无法获取列表流的最后一个值, 为了更好地理解,做了一个弹珠图,

selectedObject$ =  new BehaviorSubject(0);
incomingObjects$ = new Subject();

list$ = incomingObjects$.pipe(
          scan((acc, val) => {
            acc.push(val);
            return acc;
          }, [])
        )
newList$ = selectedObject$.pipe(
            withLastFrom(list$),
          switchMap(([index,list])=> incomingObjects$.pipe(
             scan((acc, val) => {
              acc.push(val);
              return acc;
             }, list.slice(index))
          ))
        )

如果我理解正确的话,你可以按照下面的方法来做。

关键是要认识到 list Observable(即使用 scan 获得的 Observable)应该是 hot Observable,即一个 Observable 通知独立于它是否被订阅。原因是您要创建的每个新流都应该始终具有与其上游相同的源 Observable。

然后,正如您已经暗示的那样,select值的行为应该用 BehaviorSubject 建模。

一旦 select BehaviorSubject 通知一个值 selected,前一个流必须 complete 并且必须订阅一个新的。这是 switchMap.

的工作

剩下的就是 slice 数字数组的正确方法。

这是该方法的完整代码

const selectedObject$ = new BehaviorSubject(1);
const incomingObjects$ = interval(1000).pipe(take(10));
const incomingObjectsHot$ = new ReplaySubject<number[]>(1);

incomingObjects$
  .pipe(
    scan((acc, val) => {
      acc.push(val);
      return acc;
    }, [])
  )
  .subscribe(incomingObjectsHot$);

selectedObject$
  .pipe(
    switchMap((selected) =>
      incomingObjectsHot$.pipe(
        map((nums) => {
          const selIndex = nums.indexOf(selected);
          if (selIndex > 0) {
            return nums.slice(selIndex);
          }
        })
      )
    ),
    filter(v => !!v)
  )
  .subscribe(console.log);

示例见this stackblitz

我与 scan 运算符一起使用的一个常见模式是传递 reducer 函数而不是要扫描的值,以便可以在更新操作中使用当前值。在这种情况下,您可以 link 使用 merge 运算符的两个可观察对象,并将它们的值映射到适当的函数 - 添加到列表,或在选择后切片列表.

// these are just timers for demonstration, any observable should be fine.
const incoming$ = timer(1000, 1000).pipe(map(x => String.fromCharCode(x + 65)), take(10));
const selected$ = timer(3000, 3000).pipe(map(x => String.fromCharCode(x * 2 + 66)), take(2));

merge(
  incoming$.pipe(map(x => (s) => [...s, x])), // append to list
  selected$.pipe(map(x => (s) => { // slice list starting from selection
    const index = s.indexOf(x);
    return (index !== -1) ? s.slice(index) : s;
  }))
).pipe(
  scan((list, reducer) => reducer(list), []) // run reducer
).subscribe(x => console.log(x)); // display list state as demonstration.