等待直到第二个 Observable 发出

wait Until a second Observable emits

在 Rxjs 中,有管道 takeUntil 但没有管道 wait Until,这使得当前的 Observable 等待第二个 Observable 发出。

我的最终目标是让许多 Observable 仍在等待,直到我的 Observable init$ 只发出一个值,才能继续执行。因此我的 Observable init$ 必须执行一次,在此之前我的其他 observable 必须等到 inits 发出任何不同于 null 的值。

在这个简单的例子中,我想添加一个管道到 pipedSourcewait Until init$ ,所以源必须等到 init$ 发出才能发出它的值。

import { interval, timer, Subject, combineLatest } from 'rxjs';
import { takeUntil, skipWhile, skipUntil, concatMap, map, take } from 'rxjs/operators';
import { BehaviorSubject } from 'rxjs';

const init$ = new BehaviorSubject(null);

const source = new BehaviorSubject(null);
const pipedSource = source
.pipe(
    skipWhile((res)=> res === null)
    //wait until init$ emits a non null value
)

//first subscription to source
pipedSource.subscribe(val => console.log(val));

source.next({profile:"me"});

//init emits once
setTimeout(()=>{
  init$.next(1);
},2000);

// a second subscription to source
setTimeout(()=>{
  pipedSource.subscribe(val => console.log(val));
},3000);

想要的结果:

//after 2s of waiting
//first subscription returns "profile"
//after 3s
//second subscription returns "profile" 

您想 运行 当第一个可观察对象发出非空值时,第二个可观察对象。为此,请在 skipWhile.

之后使用 concatMapswitchMap
ngOnInit() {
  const init$ = new BehaviorSubject(null);

  const source = new BehaviorSubject({profile:"me"});
  const pipedSource = init$
  .pipe(
      skipWhile((res)=> res === null),
      concatMap(() => source)
  );

  pipedSource.subscribe(val => console.log('first', val));

  //init emits once
  setTimeout(()=>{
    init$.next(1);
  },2000);

  // a second subscription to source
  setTimeout(()=>{
    pipedSource.subscribe(val => console.log('second', val));
  }, 3000);
}

这里我先订阅 init$ observable,等待它发出一个非空值,然后切换到 source observable。

演示:https://stackblitz.com/edit/angular-p7kftd

如果我没看错你的问题,我看到了 2 个潜在案例。

第一个是您的 source Observable 开始独立于 wait$ 发出其值。当 wait$ 发出时,您才开始使用 source 发出的值。这种行为可以使用 combineLatest 函数来实现,像这样

//emits value every 500ms
const source$ = interval(500);

combineLatest(source$, wait$)
    .pipe(
        map(([s, v]) => s), // to filter out the value emitted by wait$
        take(5), // just to limit to 5 the values emitted
    )
    .subscribe(val => console.log(val));

setTimeout(() => {
    wait$.next(1);
}, 2000);

在这种情况下,您在控制台上看到的是从 2 开始的序列,因为 wait$ 在 2 秒后发出。

第二种情况是您希望 source 仅在 wait$ 发出后才开始发出其值。在这种情况下,您可以使用 switchMap 运算符,例如此处

const wait_2$ = new Subject();

//emits value every 500ms
const source_2$ = interval(500);

wait_2$
    .pipe(
        switchMap(() => source_2$),
        take(5), // just to limit to 5 the values emitted
    )
    .subscribe(val => console.log(val));

setTimeout(() => {
    wait_2$.next(1);
}, 4000);

在这种情况下,4 秒后,控制台上会打印一个以 0 开头的序列。