Combining/merging 观察值
Combining/merging observables
假设我正在开发一个聊天应用程序。我有可观察的 threads$
每 n 秒发出线程数组,可观察的 offline$
通知线程何时脱机,可观察的 online$
通知何时一个线程上线:
enum ConnectionStatus { Offline = 0, Online }
interface Thread {
id: string;
status: ConnectionStatus
}
const threads$ = Observable
.interval(n)
.switchMap(() => Observable.create((observer: Observer<Array<Thread>>) =>
getThreads((threads: Array<Thread>) => observer.next(threads))));
const online$ = Observable.create((observer: Observer<Thread>) =>
onOnline((threadId: string) => observer.next({
id: threadId,
status: ConnectionStatus.Online
})));
const offline$ = Observable.create((observer: Observer<Thread>) =>
onOffline((threadId: string) => observer.next({
id: threadId,
status: ConnectionStatus.Offline
})));
我想按照 规则 合并这些流:threads$
应该每 n 秒发出一次数组,但是每当 online$
或 offline$
发出,我想获取 threads$
的最新值 (Array<Threads>
) 并通过更改一个线程的状态来映射它并立即发出映射集合。
我已经忘记了 Rx 的 combineLatest
、mergeMap
、zip
和类似的东西,所以如果有人能帮助我在这种情况下实现组合,我将不胜感激(更多Rx 方式的)
每当 threads$
发出 和 时,这应该立即发出 Array<Thread>
当 online$
和 offline$
发出时。
const threadUpdate$ = Observable.merge(
threads$,
Observable.merge(online$, offline$)
.withLatestFrom(threads$,
(thread, threads) => threads.map(t => {
if(t.id === thread.id) {
t.status = thread.status
}
})));
请注意,threads$
将继续发射,甚至可能与合并的 online$
/offline$
流同时发射。
我想你可以这样使用 multicast()
:
const stop$ = Observable.merge(online$, offline$);
threads$
.multicast(new Subject(), obs => Observable.merge(obs, obs.takeUntil(stop$).takeLast(1).map(...)))
.subscribe(...);
我显然没有测试它,但也许它会把你推向正确的方向。
假设我正在开发一个聊天应用程序。我有可观察的 threads$
每 n 秒发出线程数组,可观察的 offline$
通知线程何时脱机,可观察的 online$
通知何时一个线程上线:
enum ConnectionStatus { Offline = 0, Online }
interface Thread {
id: string;
status: ConnectionStatus
}
const threads$ = Observable
.interval(n)
.switchMap(() => Observable.create((observer: Observer<Array<Thread>>) =>
getThreads((threads: Array<Thread>) => observer.next(threads))));
const online$ = Observable.create((observer: Observer<Thread>) =>
onOnline((threadId: string) => observer.next({
id: threadId,
status: ConnectionStatus.Online
})));
const offline$ = Observable.create((observer: Observer<Thread>) =>
onOffline((threadId: string) => observer.next({
id: threadId,
status: ConnectionStatus.Offline
})));
我想按照 规则 合并这些流:threads$
应该每 n 秒发出一次数组,但是每当 online$
或 offline$
发出,我想获取 threads$
的最新值 (Array<Threads>
) 并通过更改一个线程的状态来映射它并立即发出映射集合。
我已经忘记了 Rx 的 combineLatest
、mergeMap
、zip
和类似的东西,所以如果有人能帮助我在这种情况下实现组合,我将不胜感激(更多Rx 方式的)
每当 threads$
发出 和 时,这应该立即发出 Array<Thread>
当 online$
和 offline$
发出时。
const threadUpdate$ = Observable.merge(
threads$,
Observable.merge(online$, offline$)
.withLatestFrom(threads$,
(thread, threads) => threads.map(t => {
if(t.id === thread.id) {
t.status = thread.status
}
})));
请注意,threads$
将继续发射,甚至可能与合并的 online$
/offline$
流同时发射。
我想你可以这样使用 multicast()
:
const stop$ = Observable.merge(online$, offline$);
threads$
.multicast(new Subject(), obs => Observable.merge(obs, obs.takeUntil(stop$).takeLast(1).map(...)))
.subscribe(...);
我显然没有测试它,但也许它会把你推向正确的方向。