Behaviorsubject 和 of() 的行为方式完全不同
Behaviorsubject and of() behave ina completely different way
我正在尝试重新创建可观察对象的处理链,其中第一步必须是主题(我需要调用 .next() )。使用 of() 有效,但是 returns 一个 Observable,使用一个 BehaviorSubject 应该有类似的效果,但它不起作用:用 of 创建的 observable 完美地工作并且订阅通过管道带来数据并且 returns 修改后的数据,而对于 behaviorsubject,数据保留在主题中,订阅永远不会获取数据。
示例:
getProcessed(processed: string = null, identifier = 'default'): Observable<any> {
const bs = new BehaviorSubject(this.start.data);
this.localFilterSub.set(identifier, bs);
this.localFilterObs.set(identifier, bs.asObservable());
this.localFilterSet.set(identifier, {});
this.process(processed, identifier);
return this.localFilterObs.get(identifier);
}
process(name: string, identifier = 'default') {
this.localFilterObs.set(identifier, this.doProcess(name, identifier));
}
private doProcess(name: string, identifier = 'default'): Observable<any>|Subject<any> {
if (name) {
const inst = new Op();
const obss = [];
obss.push(this.localFilterObs.get(identifier));
obss.push(inst.getExternal());
return forkJoin(obss).pipe(
tap((data) => {
console.log(name, data);
}),
map((data) => {
return inst?.run(data);
// this.done.push(name);
}),
tap((data) => {
console.log(name, data);
}),
);
}
}
我真的不明白我做错了什么。
forkJoin 在所有可观察对象完成之前不会发出。 of 将立即完成,但行为主题在您调用完成之前不会完成。使用 combineLatest,它会在所有 observable 发出后发出。
const { BehaviorSubject, of, forkJoin } = rxjs;
const bs$ = new BehaviorSubject('bs');
o$ = of('of')
forkJoin([bs$, o$]).subscribe(res => { console.log(res); });
console.log('Nothing yet as bs$ not complete');
setTimeout(() => { bs$.complete(); }, 2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>
但 combineLatest 会立即发出
const { BehaviorSubject, of, combineLatest } = rxjs;
const bs$ = new BehaviorSubject('bs');
o$ = of('of')
combineLatest([bs$, o$]).subscribe(res => { console.log(res); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>
我正在尝试重新创建可观察对象的处理链,其中第一步必须是主题(我需要调用 .next() )。使用 of() 有效,但是 returns 一个 Observable,使用一个 BehaviorSubject 应该有类似的效果,但它不起作用:用 of 创建的 observable 完美地工作并且订阅通过管道带来数据并且 returns 修改后的数据,而对于 behaviorsubject,数据保留在主题中,订阅永远不会获取数据。
示例:
getProcessed(processed: string = null, identifier = 'default'): Observable<any> {
const bs = new BehaviorSubject(this.start.data);
this.localFilterSub.set(identifier, bs);
this.localFilterObs.set(identifier, bs.asObservable());
this.localFilterSet.set(identifier, {});
this.process(processed, identifier);
return this.localFilterObs.get(identifier);
}
process(name: string, identifier = 'default') {
this.localFilterObs.set(identifier, this.doProcess(name, identifier));
}
private doProcess(name: string, identifier = 'default'): Observable<any>|Subject<any> {
if (name) {
const inst = new Op();
const obss = [];
obss.push(this.localFilterObs.get(identifier));
obss.push(inst.getExternal());
return forkJoin(obss).pipe(
tap((data) => {
console.log(name, data);
}),
map((data) => {
return inst?.run(data);
// this.done.push(name);
}),
tap((data) => {
console.log(name, data);
}),
);
}
}
我真的不明白我做错了什么。
forkJoin 在所有可观察对象完成之前不会发出。 of 将立即完成,但行为主题在您调用完成之前不会完成。使用 combineLatest,它会在所有 observable 发出后发出。
const { BehaviorSubject, of, forkJoin } = rxjs;
const bs$ = new BehaviorSubject('bs');
o$ = of('of')
forkJoin([bs$, o$]).subscribe(res => { console.log(res); });
console.log('Nothing yet as bs$ not complete');
setTimeout(() => { bs$.complete(); }, 2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>
但 combineLatest 会立即发出
const { BehaviorSubject, of, combineLatest } = rxjs;
const bs$ = new BehaviorSubject('bs');
o$ = of('of')
combineLatest([bs$, o$]).subscribe(res => { console.log(res); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>