Combining/merging 观察值

Combining/merging observables

假设我正在开发一个聊天应用程序。我有可观察的 threads$n 秒发出线程数组,可观察的 offline$ 通知线程何时脱机,可观察的 online$ 通知何时一个线程上线:

enum ConnectionStatus { Offline = 0, Online }

interface Thread {
    id: string;
    status: ConnectionStatus
}

const threads$ = Observable
    .interval(n)
    .switchMap(() => Observable.create((observer: Observer<Array<Thread>>) =>
        getThreads((threads: Array<Thread>) => observer.next(threads))));

const online$ = Observable.create((observer: Observer<Thread>) =>
    onOnline((threadId: string) => observer.next({
        id: threadId,
        status: ConnectionStatus.Online
    })));

const offline$ = Observable.create((observer: Observer<Thread>) =>
    onOffline((threadId: string) => observer.next({
        id: threadId,
        status: ConnectionStatus.Offline
    })));

我想按照 规则 合并这些流:threads$ 应该每 n 秒发出一次数组,但是每当 online$offline$发出,我想获取 threads$ 的最新值 (Array<Threads>) 并通过更改一个线程的状态来映射它并立即发出映射集合。

我已经忘记了 Rx 的 combineLatestmergeMapzip 和类似的东西,所以如果有人能帮助我在这种情况下实现组合,我将不胜感激(更多Rx 方式的)

每当 threads$ 发出 时,这应该立即发出 Array<Thread>online$offline$ 发出时。

const threadUpdate$ = Observable.merge(
    threads$,
    Observable.merge(online$, offline$)
        .withLatestFrom(threads$,
            (thread, threads) => threads.map(t => {
                if(t.id === thread.id) {
                    t.status = thread.status
                }
            })));

请注意,threads$ 将继续发射,甚至可能与合并的 online$/offline$ 流同时发射。

我想你可以这样使用 multicast():

const stop$ = Observable.merge(online$, offline$);
threads$
    .multicast(new Subject(), obs => Observable.merge(obs, obs.takeUntil(stop$).takeLast(1).map(...)))
    .subscribe(...);

我显然没有测试它,但也许它会把你推向正确的方向。