RxJS:如何不订阅初始值 and/or undefined?

RxJS: How to not subscribe to initial value and/or undefined?

作为 RxJS 的新手,我经常创建一个在未来具有价值的主题,但最初是 undefined。只能是undefined第一次。我目前使用 filter 来跳过 undefined 值,但这非常麻烦,因为我只需要一次 everywhere 。 (也许我在这里做错了什么?)只有在通过 onNext 获得第一个值后,我才能以某种方式订阅 mySubject 吗?

var mySubject = new Rx.BehaviorSubject(undefined);

mySubject.filter(function(value) {
  return value !== undefined;
}).subscribe(function(value) {
  // do something with the value
});

使用 new Rx.ReplaySubject(1) 代替 BehaviorSubject

所述,您应该可以使用跳过运算符跳过第一个值:

var mySubject = new Rx.BehaviorSubject(undefined);

mySubject.pipe(skip(1)).subscribe(function(value) {
  // do something with the value
});

目前我正在使用filter operator,但我不知道它是否是一个好的解决方案:

var mySubject = new Rx.BehaviorSubject().filter(x => !!x);

mySubject.subscribe(value => { /* will receive value from below */);

mySubject.next('value');

mySubject.subscribe(value => { /* also receives the value */ });

mySubject.pipe( skipWhile( v => !v ) );

有时会需要 behaviorSubject,其中初始值无关紧要,并且在流内部工作时异步需要当前值, 在我们的例子中,多个链承诺在处理时或在从流内的任何地方获取数据期间通过用户取消来处理。

这可以通过以下方式实现。

// for user related commands
this.commandSource = new BehaviorSubject(CONTINUE);
// filtering over initial value which is continue to make it as a different pipe
const stopPipe = commandSource.pipe(filter(val => val === STOP));
const fetchStream = Observable.fromPromise(this.fetchDetails);

merge(fetchStream, stopPipe).pipe(
 take(1),
 takeWhile(() => commandSource.value === CONTINUE),
 concatMap((response) => {
  // fetch Another response you can return promise directly in concatMap
  // return array of response [1 ,2 ,3];
  return this.fetchYetAnotherDetails;
 }),
 // we can add this to stop stream in multiple places while processing the response
 takeWhile(() => commandSource.value === CONTINUE),
 // triggers parallelly values from the concatMap that is 1, 2 , 3
 mergeMap(() => // massage the response parallelly using )
 finalize(() => thi
  commandSource.complete())
).subscribe(res => {
 // handle each response 1, 2, 3 mapped
}, () => {
 // handle error
}, () => {
 // handle complete of the stream
});

// when user, clicks cancel, this should stop the stream.
commandSource.next(STOP)

我发现这在 RxJS 和 RxSwift 中都令人沮丧。 (想要一个值主题并具有等待第一个值的能力)。

对于 JS,我目前只是在主题中隐藏了一个过滤版本,如下所示:

    let mySubject = new Rx.BehaviorSubject();
    mySubject.wait = mySubject.pipe(filter(v=>v!==undefined));

因此主题仍然可以发布,但客户不必重复筛选。

    mySubject.wait.subscribe((v)=>{...});