如何在 运行 时分享一个 Observable 并在它不是时重新订阅
How to share an Observable when it's running and resubscribe when it's not
我有一个 Observable,当我订阅它时我想要:
- 如果正在执行,return当前执行的结果
- 如果没有,re/execute Observable。
例如:
我有一个wait
函数。此函数等待 1 秒,return 当前 time()
function wait(): Observable<number> {
return new Observable<number>((observer) => {
setTimeout(() => {
observer.next((new Date).getTime());
observer.complete();
}, 1000);
});
}
现在,我有一个代码每 0.4 秒调用一次此 wait
函数。
- 第一次调用
wait
(0.4 秒)将等待 1 秒和 return 当前时间
- 第二次调用
wait
(0.8秒),因为有wait
的执行,会return第一个点的同一个observable和同一个结果
- 第三次调用
wait
(1.2 秒)将再次等待 1 秒,因为没有执行
- 等等
我不确定我理解你的例子,我认为第 3 次调用应该得到初始时间戳,第 4 次调用应该得到一个新的。
无论如何,你可以像这样通过半共享 Observable 来做到这一点:
const timeStream = timer(1000)
.pipe(map(() => (new Date).getTime()), share(), take(1));
function wait(): Observable<number> {
return timeStream;
}
setTimeout(() => wait().subscribe(console.log), 400);
setTimeout(() => wait().subscribe(console.log), 800);
setTimeout(() => wait().subscribe(console.log), 1200);
setTimeout(() => wait().subscribe(console.log), 1600);
https://stackblitz.com/edit/typescript-wiqpbf
关键是 share() 和 take(1) 组合 - 本质上这将使 Observable 仅在 'running' 时共享。
OP的最终解决方案:
import {Observable} from 'rxjs';
import {share, take} from 'rxjs/operators';
function myFunction(): Observable<number> {
return new Observable<number>((observer) => {
setTimeout(() => {
observer.next((new Date).getTime());
observer.complete();
}, 1000);
});
}
const task$ = myFunction().pipe(share(), take(1));
setInterval(() => {
task$.subscribe(console.log);
}, 400);
我有一个 Observable,当我订阅它时我想要:
- 如果正在执行,return当前执行的结果
- 如果没有,re/execute Observable。
例如:
我有一个wait
函数。此函数等待 1 秒,return 当前 time()
function wait(): Observable<number> {
return new Observable<number>((observer) => {
setTimeout(() => {
observer.next((new Date).getTime());
observer.complete();
}, 1000);
});
}
现在,我有一个代码每 0.4 秒调用一次此 wait
函数。
- 第一次调用
wait
(0.4 秒)将等待 1 秒和 return 当前时间 - 第二次调用
wait
(0.8秒),因为有wait
的执行,会return第一个点的同一个observable和同一个结果 - 第三次调用
wait
(1.2 秒)将再次等待 1 秒,因为没有执行 - 等等
我不确定我理解你的例子,我认为第 3 次调用应该得到初始时间戳,第 4 次调用应该得到一个新的。
无论如何,你可以像这样通过半共享 Observable 来做到这一点:
const timeStream = timer(1000)
.pipe(map(() => (new Date).getTime()), share(), take(1));
function wait(): Observable<number> {
return timeStream;
}
setTimeout(() => wait().subscribe(console.log), 400);
setTimeout(() => wait().subscribe(console.log), 800);
setTimeout(() => wait().subscribe(console.log), 1200);
setTimeout(() => wait().subscribe(console.log), 1600);
https://stackblitz.com/edit/typescript-wiqpbf
关键是 share() 和 take(1) 组合 - 本质上这将使 Observable 仅在 'running' 时共享。
OP的最终解决方案:
import {Observable} from 'rxjs';
import {share, take} from 'rxjs/operators';
function myFunction(): Observable<number> {
return new Observable<number>((observer) => {
setTimeout(() => {
observer.next((new Date).getTime());
observer.complete();
}, 1000);
});
}
const task$ = myFunction().pipe(share(), take(1));
setInterval(() => {
task$.subscribe(console.log);
}, 400);