RxJS takeWhile 但包含最后一个值

RxJS takeWhile but include the last value

我的 RxJS5 管道看起来像这样

Rx.Observable.from([2, 3, 4, 5, 6])
  .takeWhile((v) => { v !== 4 })

我想保留订阅直到看到 4,但我希望最后一个元素 4 也包含在结果中。所以上面的例子应该是

2, 3, 4

但是,根据 official documenttakeWhile 运算符不包含在内。这意味着当它遇到与我们给定的谓词不匹配的元素时,它会立即完成流而没有最后一个元素。结果,上面的代码实际上会输出

2, 3

所以我的问题是,什么是最简单的方法我可以实现 takeWhile 同时使用 RxJS 发出最后一个元素?

从 RxJS 6.4.0 开始,现在可以使用 takeWhile(predicate, true)

已经有一个打开的 PR 将可选的 inclusive 参数添加到 takeWhilehttps://github.com/ReactiveX/rxjs/pull/4115

至少有两种可能的解决方法:

  1. 使用concatMap():

    of('red', 'blue', 'green', 'orange').pipe(
      concatMap(color => {
        if (color === 'green') {
          return of(color, null);
        }
        return of(color);
      }),
      takeWhile(color => color),
    )
    
  2. 使用multicast():

    of('red', 'blue', 'green', 'orange').pipe(
      multicast(
        () => new ReplaySubject(1),
        subject => subject.pipe(
          takeWhile((c) => c !== 'green'),
          concat(subject.take(1),
        )
      ),
    )
    

我也一直在使用这个运算符,所以我把它变成了我自己的一组额外的 RxJS 5 运算符:https://github.com/martinsik/rxjs-extra#takewhileinclusive

这个运算符也在这个 RxJS 5 问题中讨论过:https://github.com/ReactiveX/rxjs/issues/2420

2019 年 1 月:针对 RxJS 6 更新

如果您比较清楚最后一个元素是什么(例如 !==),您可以自己重新添加:

Rx.Observable.from([2, 3, 4, 5, 6])
  .takeWhile((v) => v !== 4)
  .concat(Rx.Observable.of(4))
  .subscribe(console.log)

我遇到了同样的问题,我需要包含最后一个元素,所以我选择保留对订阅的引用并取消订阅 onNext 回调 当条件满足时。使用您的示例代码将是:

const subscription = Observable.of('red', 'blue', 'green', 'orange')
  .subscribe(color => {
    // Do something with the value here
    if (color === 'green') {
      subscription.unsubscribe()
    }
  }) 

这对我有用,因为它还导致可观察源停止发射,这正是我在我的场景中所需要的。 我意识到我没有使用 takeWhile 运算符,但主要目标已经实现并且没有任何变通方法或额外代码。 我不喜欢强迫事物以非设计的方式工作。 这样做的缺点是:

  • 如果有任何其他观察者订阅,源将继续发射。
  • 如果最后一个观察者取消订阅,onCompleted 不会因为某种原因被调用,但我检查了源实际上停止发射。

你可以使用 endWith(value) 哪个(不像很多 RxJS 代码) 非常漂亮的自我记录。

const example = source.pipe(
                            takeWhile(val => val != 4), 
                            endWith(4));

PS。另请注意 takeUntil 不采用谓词,因此如果您试图使用该运算符来解决此问题,则不能。这是一个完全不同的方法签名。

官方文档: https://rxjs-dev.firebaseapp.com/api/operators/endWith

https://stackblitz.com/edit/typescript-pvuawt

就我而言,我无法预测最终值是多少。我还只是想要一个涉及常见、简单的运算符的解决方案,并且我想要一些我可以重复使用的东西,所以我不能依赖这些值是真实的。我唯一能想到的就是像这样定义我自己的运算符:

import { pipe, from } from 'rxjs';
import { switchMap, takeWhile, filter, map } from 'rxjs/operators';

export function doWhile<T>(shouldContinue: (a: T) => boolean) {
  return pipe(
    switchMap((data: T) => from([
      { data, continue: true },
      { data, continue: shouldContinue(data), exclude: true }
    ])),
    takeWhile(message => message.continue),
    filter(message => !message.exclude),
    map(message => message.data)
  );
}

有点奇怪,但对我有用,我可以导入并使用它。

2019 年 3 月更新,rsjx 版本 6.4.0takeWhile 终于有了一个可选的 inclusive 参数,允许保留第一个打破条件的元素。所以现在解决方案就是将 true 作为 takeWhile:

的第二个参数传递
import { takeWhile } from 'rxjs/operators';
import { from } from 'rxjs';

const cutOff = 4.5
const example = from([2, 3, 4, 5, 6])
.pipe(takeWhile(v => v < cutOff, true ))
const subscribe = example.subscribe(val =>
  console.log('inclusive:', val)
);

输出:

inclusive: 2
inclusive: 3
inclusive: 4
inclusive: 5

住在这里:

https://stackblitz.com/edit/typescript-m7zjkr?embed=1&file=index.ts

注意 5 是第一个打破条件的元素。请注意,当您具有 v < cutOff 之类的动态条件并且您不知道最后一个元素是什么时,endWith 并不是真正的解决方案。

感谢@martin 指出此拉取请求的存在。