如何让一个 Observable 序列在发射之前等待另一个序列完成?
How to make one Observable sequence wait for another to complete before emitting?
假设我有一个 Observable
,像这样:
var one = someObservable.take(1);
one.subscribe(function(){ /* do something */ });
那我还有第二个Observable
:
var two = someOtherObservable.take(1);
现在,我想 subscribe()
到 two
,但我想确保 one
在 two
订阅者被解雇之前完成。
在two
上可以用什么样的缓冲方式让第二个等待第一个完成?
我想我想暂停 two
直到 one
完成。
我能想到的几种方法
import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'
//Method one
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});
//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
如果第二个 observable 是 hot,则有 another way 要做 pause/resume:
var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);
source1.doOnCompleted(function () {
/* resume paused source2 */
pauser.onNext(true);
}).subscribe(function(){
// do something
});
source2.subscribe(function(){
// start to recieve data
});
您还可以使用缓冲版本 pausableBuffered 在暂停期间保留数据。
如果你想确保执行的顺序被保留,你可以使用 flatMap 作为下面的例子
const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));
first
.flatMap(() => second)
.flatMap(() => third)
.subscribe(()=> console.log('finished'));
结果将是:
"1"
"11"
"111"
"finished"
这是利用 switchMap 的结果选择器的另一种可能性
var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
/** Wait for first Observable */
() => one$,
/** Only return the value we're actually interested in */
(value2, value1) => value2
)
.subscribe((value2) => {
/* do something */
});
由于 switchMap 的结果选择器已经过时,这里是一个更新版本
const one$ = someObservable.pipe(take(1));
const two$ = someOtherObservable.pipe(
take(1),
switchMap(value2 => one$.map(_ => value2))
);
two$.subscribe(value2 => {
/* do something */
});
这是另一种方法,但我觉得更直接和直观(或者至少是自然的,如果你习惯了 Promises)。基本上,您使用 Observable.create()
创建一个 Observable,将 one
和 two
包装为单个 Observable。这与 Promise.all()
的工作方式非常相似。
var first = someObservable.take(1);
var second = Observable.create((observer) => {
return first.subscribe(
function onNext(value) {
/* do something with value like: */
// observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
someOtherObservable.take(1).subscribe(
function onNext(value) {
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
observer.complete();
}
);
}
);
});
那么,这是怎么回事?首先,我们创建一个新的 Observable。传递给 Observable.create()
的函数,恰当地命名为 onSubscription
,被传递给观察者(根据您传递给 subscribe()
的参数构建),这类似于 resolve
和 reject
在创建新的 Promise 时组合成一个对象。这就是我们让魔法发挥作用的方式。
在onSubscription
中,我们订阅了第一个Observable(在上面的例子中,它被称为one
)。我们如何处理 next
和 error
取决于您,但我的示例中提供的默认设置一般来说应该是合适的。然而,当我们收到 complete
事件时,这意味着 one
现在已经完成,我们可以订阅下一个 Observable;从而在第一个 Observable 完成后触发第二个 Observable。
为第二个 Observable 提供的示例观察器非常简单。基本上,second
现在的行为就像您期望 two
在 OP 中的行为一样。更具体地说,假设没有错误,second
将发出第一个且仅是 someOtherObservable
发出的第一个值(因为 take(1)
)然后完成。
例子
如果您想在现实生活中看到我的示例,您可以copy/paste这里是一个完整的工作示例:
var someObservable = Observable.from([1, 2, 3, 4, 5]);
var someOtherObservable = Observable.from([6, 7, 8, 9]);
var first = someObservable.take(1);
var second = Observable.create((observer) => {
return first.subscribe(
function onNext(value) {
/* do something with value like: */
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
someOtherObservable.take(1).subscribe(
function onNext(value) {
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
observer.complete();
}
);
}
);
}).subscribe(
function onNext(value) {
console.log(value);
},
function onError(error) {
console.error(error);
},
function onComplete() {
console.log("Done!");
}
);
如果你看控制台,上面的例子会打印:
1
6
Done!
skipUntil() 和 last()
skipUntil:忽略发出的项目,直到另一个 observable 发出
last:发出序列中的最后一个值(即等到它完成然后发出)
请注意,从传递给 skipUntil
的可观察对象发出的任何内容都会取消跳过,这就是为什么我们需要添加 last()
- 等待流完成。
main$.skipUntil(sequence2$.pipe(last()))
官方:https://rxjs-dev.firebaseapp.com/api/operators/skipUntil
可能的问题:请注意 last()
本身 will error 如果什么都没有发出。 last()
运算符确实有一个 default
参数,但仅当与谓词结合使用时。我认为如果这种情况对你来说是个问题(如果 sequence2$
可能会在不发出信号的情况下完成)那么其中一个应该可以工作(目前未经测试):
main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))
请注意,undefined
是要发出的有效项目,但实际上可以是任何值。另请注意,这是连接到 sequence2$
的管道,而不是 main$
管道。
这是一种可重复使用的方法(它是打字稿,但您可以将其改编为 js):
function waitFor<T>(signal: Observable<any>) {
return (source: Observable<T>) => signal.pipe(
first(),
switchMap(_ => source),
);
}
您可以像任何运算符一样使用它:
var two = someOtherObservable.pipe(waitFor(one), take(1));
它基本上是一个运算符,可以推迟对源可观察对象的订阅,直到信号可观察对象发出第一个事件。
由于 mergeMap(或他的别名 flatMap)运算符,您可以使用之前 Observable 发出的结果,如下所示:
const one = Observable.of('https://api.github.com/users');
const two = (c) => ajax(c);//ajax from Rxjs/dom library
one.mergeMap(two).subscribe(c => console.log(c))
好吧,我知道这已经很老了,但我认为您可能需要的是:
var one = someObservable.take(1);
var two = someOtherObservable.pipe(
concatMap((twoRes) => one.pipe(mapTo(twoRes))),
take(1)
).subscribe((twoRes) => {
// one is completed and we get two's subscription.
})
这是一个用 TypeScript 编写的自定义运算符,它在发出结果之前等待信号:
export function waitFor<T>(
signal$: Observable<any>
) {
return (source$: Observable<T>) =>
new Observable<T>(observer => {
// combineLatest emits the first value only when
// both source and signal emitted at least once
combineLatest([
source$,
signal$.pipe(
first(),
),
])
.subscribe(([v]) => observer.next(v));
});
}
你可以这样使用它:
two.pipe(waitFor(one))
.subscribe(value => ...);
也许您可以使用 delayWhen
运算符。
我们有两个可观察值 one$
和 two$
。第一个 observable 在 1s 延迟后发出 1
然后完成。第二个 Observable 仅在 one$
发出后才发出 2
:
const one$ = of(1).pipe(
delay(1000),
tap(() => console.log('one$ emitted'))
);
const two$ = of(2).pipe(
delayWhen(() => one$),
tap(() => console.log('two$ emitted')),
);
two$.subscribe(n => {
console.log(`n=${n}`);
});
<script src="https://unpkg.com/rxjs@7.5.5/dist/bundles/rxjs.umd.min.js"></script>
<script>
const {of} = rxjs;
const {delay, delayWhen, tap} = rxjs.operators;
</script>
假设我有一个 Observable
,像这样:
var one = someObservable.take(1);
one.subscribe(function(){ /* do something */ });
那我还有第二个Observable
:
var two = someOtherObservable.take(1);
现在,我想 subscribe()
到 two
,但我想确保 one
在 two
订阅者被解雇之前完成。
在two
上可以用什么样的缓冲方式让第二个等待第一个完成?
我想我想暂停 two
直到 one
完成。
我能想到的几种方法
import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'
//Method one
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});
//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
如果第二个 observable 是 hot,则有 another way 要做 pause/resume:
var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);
source1.doOnCompleted(function () {
/* resume paused source2 */
pauser.onNext(true);
}).subscribe(function(){
// do something
});
source2.subscribe(function(){
// start to recieve data
});
您还可以使用缓冲版本 pausableBuffered 在暂停期间保留数据。
如果你想确保执行的顺序被保留,你可以使用 flatMap 作为下面的例子
const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));
first
.flatMap(() => second)
.flatMap(() => third)
.subscribe(()=> console.log('finished'));
结果将是:
"1"
"11"
"111"
"finished"
这是利用 switchMap 的结果选择器的另一种可能性
var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
/** Wait for first Observable */
() => one$,
/** Only return the value we're actually interested in */
(value2, value1) => value2
)
.subscribe((value2) => {
/* do something */
});
由于 switchMap 的结果选择器已经过时,这里是一个更新版本
const one$ = someObservable.pipe(take(1));
const two$ = someOtherObservable.pipe(
take(1),
switchMap(value2 => one$.map(_ => value2))
);
two$.subscribe(value2 => {
/* do something */
});
这是另一种方法,但我觉得更直接和直观(或者至少是自然的,如果你习惯了 Promises)。基本上,您使用 Observable.create()
创建一个 Observable,将 one
和 two
包装为单个 Observable。这与 Promise.all()
的工作方式非常相似。
var first = someObservable.take(1);
var second = Observable.create((observer) => {
return first.subscribe(
function onNext(value) {
/* do something with value like: */
// observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
someOtherObservable.take(1).subscribe(
function onNext(value) {
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
observer.complete();
}
);
}
);
});
那么,这是怎么回事?首先,我们创建一个新的 Observable。传递给 Observable.create()
的函数,恰当地命名为 onSubscription
,被传递给观察者(根据您传递给 subscribe()
的参数构建),这类似于 resolve
和 reject
在创建新的 Promise 时组合成一个对象。这就是我们让魔法发挥作用的方式。
在onSubscription
中,我们订阅了第一个Observable(在上面的例子中,它被称为one
)。我们如何处理 next
和 error
取决于您,但我的示例中提供的默认设置一般来说应该是合适的。然而,当我们收到 complete
事件时,这意味着 one
现在已经完成,我们可以订阅下一个 Observable;从而在第一个 Observable 完成后触发第二个 Observable。
为第二个 Observable 提供的示例观察器非常简单。基本上,second
现在的行为就像您期望 two
在 OP 中的行为一样。更具体地说,假设没有错误,second
将发出第一个且仅是 someOtherObservable
发出的第一个值(因为 take(1)
)然后完成。
例子
如果您想在现实生活中看到我的示例,您可以copy/paste这里是一个完整的工作示例:
var someObservable = Observable.from([1, 2, 3, 4, 5]);
var someOtherObservable = Observable.from([6, 7, 8, 9]);
var first = someObservable.take(1);
var second = Observable.create((observer) => {
return first.subscribe(
function onNext(value) {
/* do something with value like: */
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
someOtherObservable.take(1).subscribe(
function onNext(value) {
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
observer.complete();
}
);
}
);
}).subscribe(
function onNext(value) {
console.log(value);
},
function onError(error) {
console.error(error);
},
function onComplete() {
console.log("Done!");
}
);
如果你看控制台,上面的例子会打印:
1
6
Done!
skipUntil() 和 last()
skipUntil:忽略发出的项目,直到另一个 observable 发出
last:发出序列中的最后一个值(即等到它完成然后发出)
请注意,从传递给 skipUntil
的可观察对象发出的任何内容都会取消跳过,这就是为什么我们需要添加 last()
- 等待流完成。
main$.skipUntil(sequence2$.pipe(last()))
官方:https://rxjs-dev.firebaseapp.com/api/operators/skipUntil
可能的问题:请注意 last()
本身 will error 如果什么都没有发出。 last()
运算符确实有一个 default
参数,但仅当与谓词结合使用时。我认为如果这种情况对你来说是个问题(如果 sequence2$
可能会在不发出信号的情况下完成)那么其中一个应该可以工作(目前未经测试):
main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))
请注意,undefined
是要发出的有效项目,但实际上可以是任何值。另请注意,这是连接到 sequence2$
的管道,而不是 main$
管道。
这是一种可重复使用的方法(它是打字稿,但您可以将其改编为 js):
function waitFor<T>(signal: Observable<any>) {
return (source: Observable<T>) => signal.pipe(
first(),
switchMap(_ => source),
);
}
您可以像任何运算符一样使用它:
var two = someOtherObservable.pipe(waitFor(one), take(1));
它基本上是一个运算符,可以推迟对源可观察对象的订阅,直到信号可观察对象发出第一个事件。
由于 mergeMap(或他的别名 flatMap)运算符,您可以使用之前 Observable 发出的结果,如下所示:
const one = Observable.of('https://api.github.com/users');
const two = (c) => ajax(c);//ajax from Rxjs/dom library
one.mergeMap(two).subscribe(c => console.log(c))
好吧,我知道这已经很老了,但我认为您可能需要的是:
var one = someObservable.take(1);
var two = someOtherObservable.pipe(
concatMap((twoRes) => one.pipe(mapTo(twoRes))),
take(1)
).subscribe((twoRes) => {
// one is completed and we get two's subscription.
})
这是一个用 TypeScript 编写的自定义运算符,它在发出结果之前等待信号:
export function waitFor<T>(
signal$: Observable<any>
) {
return (source$: Observable<T>) =>
new Observable<T>(observer => {
// combineLatest emits the first value only when
// both source and signal emitted at least once
combineLatest([
source$,
signal$.pipe(
first(),
),
])
.subscribe(([v]) => observer.next(v));
});
}
你可以这样使用它:
two.pipe(waitFor(one))
.subscribe(value => ...);
也许您可以使用 delayWhen
运算符。
我们有两个可观察值 one$
和 two$
。第一个 observable 在 1s 延迟后发出 1
然后完成。第二个 Observable 仅在 one$
发出后才发出 2
:
const one$ = of(1).pipe(
delay(1000),
tap(() => console.log('one$ emitted'))
);
const two$ = of(2).pipe(
delayWhen(() => one$),
tap(() => console.log('two$ emitted')),
);
two$.subscribe(n => {
console.log(`n=${n}`);
});
<script src="https://unpkg.com/rxjs@7.5.5/dist/bundles/rxjs.umd.min.js"></script>
<script>
const {of} = rxjs;
const {delay, delayWhen, tap} = rxjs.operators;
</script>