如何在 运行 时分享一个 Observable 并在它不是时重新订阅

How to share an Observable when it's running and resubscribe when it's not

我有一个 Observable,当我订阅它时我想要:

例如:

我有一个wait函数。此函数等待 1 秒,return 当前 time()

function wait(): Observable<number> {
  return new Observable<number>((observer) => {
    setTimeout(() => {
      observer.next((new Date).getTime());
      observer.complete();
    }, 1000);
  });
}

现在,我有一个代码每 0.4 秒调用一次此 wait 函数。

我不确定我理解你的例子,我认为第 3 次调用应该得到初始时间戳,第 4 次调用应该得到一个新的。

无论如何,你可以像这样通过半共享 Observable 来做到这一点:

const timeStream = timer(1000)
    .pipe(map(() => (new Date).getTime()), share(), take(1));

function wait(): Observable<number> {
    return timeStream;
}

setTimeout(() => wait().subscribe(console.log), 400);
setTimeout(() => wait().subscribe(console.log), 800);
setTimeout(() => wait().subscribe(console.log), 1200);
setTimeout(() => wait().subscribe(console.log), 1600);

https://stackblitz.com/edit/typescript-wiqpbf

关键是 share() 和 take(1) 组合 - 本质上这将使 Observable 仅在 'running' 时共享。


OP的最终解决方案:

import {Observable} from 'rxjs';
import {share, take} from 'rxjs/operators';

function myFunction(): Observable<number> {
  return new Observable<number>((observer) => {
    setTimeout(() => {
      observer.next((new Date).getTime());
      observer.complete();
    }, 1000);
  });
}

const task$ = myFunction().pipe(share(), take(1));

setInterval(() => {
  task$.subscribe(console.log);
}, 400);