等待直到第二个 Observable 发出
wait Until a second Observable emits
在 Rxjs 中,有管道 takeUntil 但没有管道 wait Until,这使得当前的 Observable 等待第二个 Observable 发出。
我的最终目标是让许多 Observable 仍在等待,直到我的 Observable init$ 只发出一个值,才能继续执行。因此我的 Observable init$ 必须执行一次,在此之前我的其他 observable 必须等到 inits 发出任何不同于 null 的值。
在这个简单的例子中,我想添加一个管道到 pipedSource 做 wait Until init$ ,所以源必须等到 init$ 发出才能发出它的值。
import { interval, timer, Subject, combineLatest } from 'rxjs';
import { takeUntil, skipWhile, skipUntil, concatMap, map, take } from 'rxjs/operators';
import { BehaviorSubject } from 'rxjs';
const init$ = new BehaviorSubject(null);
const source = new BehaviorSubject(null);
const pipedSource = source
.pipe(
skipWhile((res)=> res === null)
//wait until init$ emits a non null value
)
//first subscription to source
pipedSource.subscribe(val => console.log(val));
source.next({profile:"me"});
//init emits once
setTimeout(()=>{
init$.next(1);
},2000);
// a second subscription to source
setTimeout(()=>{
pipedSource.subscribe(val => console.log(val));
},3000);
想要的结果:
//after 2s of waiting
//first subscription returns "profile"
//after 3s
//second subscription returns "profile"
您想 运行 当第一个可观察对象发出非空值时,第二个可观察对象。为此,请在 skipWhile
.
之后使用 concatMap
或 switchMap
ngOnInit() {
const init$ = new BehaviorSubject(null);
const source = new BehaviorSubject({profile:"me"});
const pipedSource = init$
.pipe(
skipWhile((res)=> res === null),
concatMap(() => source)
);
pipedSource.subscribe(val => console.log('first', val));
//init emits once
setTimeout(()=>{
init$.next(1);
},2000);
// a second subscription to source
setTimeout(()=>{
pipedSource.subscribe(val => console.log('second', val));
}, 3000);
}
这里我先订阅 init$
observable,等待它发出一个非空值,然后切换到 source
observable。
如果我没看错你的问题,我看到了 2 个潜在案例。
第一个是您的 source
Observable 开始独立于 wait$
发出其值。当 wait$
发出时,您才开始使用 source
发出的值。这种行为可以使用 combineLatest
函数来实现,像这样
//emits value every 500ms
const source$ = interval(500);
combineLatest(source$, wait$)
.pipe(
map(([s, v]) => s), // to filter out the value emitted by wait$
take(5), // just to limit to 5 the values emitted
)
.subscribe(val => console.log(val));
setTimeout(() => {
wait$.next(1);
}, 2000);
在这种情况下,您在控制台上看到的是从 2
开始的序列,因为 wait$
在 2 秒后发出。
第二种情况是您希望 source
仅在 wait$
发出后才开始发出其值。在这种情况下,您可以使用 switchMap
运算符,例如此处
const wait_2$ = new Subject();
//emits value every 500ms
const source_2$ = interval(500);
wait_2$
.pipe(
switchMap(() => source_2$),
take(5), // just to limit to 5 the values emitted
)
.subscribe(val => console.log(val));
setTimeout(() => {
wait_2$.next(1);
}, 4000);
在这种情况下,4 秒后,控制台上会打印一个以 0
开头的序列。
在 Rxjs 中,有管道 takeUntil 但没有管道 wait Until,这使得当前的 Observable 等待第二个 Observable 发出。
我的最终目标是让许多 Observable 仍在等待,直到我的 Observable init$ 只发出一个值,才能继续执行。因此我的 Observable init$ 必须执行一次,在此之前我的其他 observable 必须等到 inits 发出任何不同于 null 的值。
在这个简单的例子中,我想添加一个管道到 pipedSource 做 wait Until init$ ,所以源必须等到 init$ 发出才能发出它的值。
import { interval, timer, Subject, combineLatest } from 'rxjs';
import { takeUntil, skipWhile, skipUntil, concatMap, map, take } from 'rxjs/operators';
import { BehaviorSubject } from 'rxjs';
const init$ = new BehaviorSubject(null);
const source = new BehaviorSubject(null);
const pipedSource = source
.pipe(
skipWhile((res)=> res === null)
//wait until init$ emits a non null value
)
//first subscription to source
pipedSource.subscribe(val => console.log(val));
source.next({profile:"me"});
//init emits once
setTimeout(()=>{
init$.next(1);
},2000);
// a second subscription to source
setTimeout(()=>{
pipedSource.subscribe(val => console.log(val));
},3000);
想要的结果:
//after 2s of waiting
//first subscription returns "profile"
//after 3s
//second subscription returns "profile"
您想 运行 当第一个可观察对象发出非空值时,第二个可观察对象。为此,请在 skipWhile
.
concatMap
或 switchMap
ngOnInit() {
const init$ = new BehaviorSubject(null);
const source = new BehaviorSubject({profile:"me"});
const pipedSource = init$
.pipe(
skipWhile((res)=> res === null),
concatMap(() => source)
);
pipedSource.subscribe(val => console.log('first', val));
//init emits once
setTimeout(()=>{
init$.next(1);
},2000);
// a second subscription to source
setTimeout(()=>{
pipedSource.subscribe(val => console.log('second', val));
}, 3000);
}
这里我先订阅 init$
observable,等待它发出一个非空值,然后切换到 source
observable。
如果我没看错你的问题,我看到了 2 个潜在案例。
第一个是您的 source
Observable 开始独立于 wait$
发出其值。当 wait$
发出时,您才开始使用 source
发出的值。这种行为可以使用 combineLatest
函数来实现,像这样
//emits value every 500ms
const source$ = interval(500);
combineLatest(source$, wait$)
.pipe(
map(([s, v]) => s), // to filter out the value emitted by wait$
take(5), // just to limit to 5 the values emitted
)
.subscribe(val => console.log(val));
setTimeout(() => {
wait$.next(1);
}, 2000);
在这种情况下,您在控制台上看到的是从 2
开始的序列,因为 wait$
在 2 秒后发出。
第二种情况是您希望 source
仅在 wait$
发出后才开始发出其值。在这种情况下,您可以使用 switchMap
运算符,例如此处
const wait_2$ = new Subject();
//emits value every 500ms
const source_2$ = interval(500);
wait_2$
.pipe(
switchMap(() => source_2$),
take(5), // just to limit to 5 the values emitted
)
.subscribe(val => console.log(val));
setTimeout(() => {
wait_2$.next(1);
}, 4000);
在这种情况下,4 秒后,控制台上会打印一个以 0
开头的序列。