将可观察到的主题
Turn observable into subject
我们有一个函数可以从后端获取流作为可观察的。然而,我们希望能够在后端完成这些更改之前也能够推送到该可观察对象以查看更改。为此,我尝试返回一个主题,但在取消订阅后连接仍在继续。
换句话说,在下面的代码中,我们希望 console.log(i)
在我们订阅该主题之前不开始,并在我们取消订阅时结束:
import { ReplaySubject, Observable, interval } from 'rxjs';
import { tap } from 'rxjs/operators'
function test() {
const obs = interval(1000).pipe(tap(i => console.log(i)));
const subj = new ReplaySubject(1);
obs.subscribe(subj);
return subj;
}
const subject = test();
subject.next('TEST');
const subscription = subject.pipe(
tap(i => console.log('from outside ' + i))
).subscribe()
setTimeout(_ => subscription.unsubscribe(), 5000);
您不能在 test
中 subscribe
。我想你想创建一个 Observable 和一个 Subject 以及 merge 那些 - 你必须分别 return 。
return [subject, merge(subject, obs)]
然后
const [subject, obs] = test();
subject.next()
但我会通过提供主题作为参数来实现。
import { ReplaySubject, Observable, interval, merge } from 'rxjs';
import { tap } from 'rxjs/operators'
function test(subject) {
return merge(
interval(1000).pipe(tap(i => console.log(i))),
subject
);
}
const subject = new ReplaySubject(1);
const obs = test(subject);
subject.next('TEST');
const subscription = obs.pipe(
tap(i => console.log('from outside ' + i))
).subscribe()
setTimeout(_ => subscription.unsubscribe(), 5000);
我们有一个函数可以从后端获取流作为可观察的。然而,我们希望能够在后端完成这些更改之前也能够推送到该可观察对象以查看更改。为此,我尝试返回一个主题,但在取消订阅后连接仍在继续。
换句话说,在下面的代码中,我们希望 console.log(i)
在我们订阅该主题之前不开始,并在我们取消订阅时结束:
import { ReplaySubject, Observable, interval } from 'rxjs';
import { tap } from 'rxjs/operators'
function test() {
const obs = interval(1000).pipe(tap(i => console.log(i)));
const subj = new ReplaySubject(1);
obs.subscribe(subj);
return subj;
}
const subject = test();
subject.next('TEST');
const subscription = subject.pipe(
tap(i => console.log('from outside ' + i))
).subscribe()
setTimeout(_ => subscription.unsubscribe(), 5000);
您不能在 test
中 subscribe
。我想你想创建一个 Observable 和一个 Subject 以及 merge 那些 - 你必须分别 return 。
return [subject, merge(subject, obs)]
然后
const [subject, obs] = test();
subject.next()
但我会通过提供主题作为参数来实现。
import { ReplaySubject, Observable, interval, merge } from 'rxjs';
import { tap } from 'rxjs/operators'
function test(subject) {
return merge(
interval(1000).pipe(tap(i => console.log(i))),
subject
);
}
const subject = new ReplaySubject(1);
const obs = test(subject);
subject.next('TEST');
const subscription = obs.pipe(
tap(i => console.log('from outside ' + i))
).subscribe()
setTimeout(_ => subscription.unsubscribe(), 5000);