RxJs Observable 多次完成
RxJs Observable completes multiple times
下面是响应式代码的一小段代码 (RxJs
)
let subj = new Rx.Subject();
let chain = subj
.switchMap(v => Rx.Observable.of(10*v).do(vv => console.log("Switch map", vv)))
.share()
.take(1);
function subscribe(){
chain.subscribe(v => console.log("Next", v),
err => console.log("Error",err),
() => console.log("Completed"));
chain.subscribe(v => console.log("Next2", v),
err => console.log("Error2",err),
() => console.log("Completed2"));
subj.next(Math.random());
}
subscribe();
subscribe();
subscribe();
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
根据 documentation chain
is an Observable
which should print the emitted value * 10 (switchMap), while printing it only once, no matter what's the number of subscriptions it has (share),仅对第一个发出的值执行此操作,然后完成。
前两颗子弹没问题,但最后一颗不行。
这是我得到的输出:
Switch map 9.022491050934722
Next 9.022491050934722
Completed
Next2 9.022491050934722
Completed2
Switch map 9.172999425126836
Next 9.172999425126836
Completed
Next2 9.172999425126836
Completed2
Switch map 6.168790337405257
Next 6.168790337405257
Completed
Next2 6.168790337405257
Completed2
如您所见,chain
完成了多次。
是什么使得可以多次完成相同的 Observable
?
share
是 publish
和 refCount
组合的快捷方式,这意味着流只有 "hot" 只要有至少 1 个订阅者,因此在流完成后,所有活跃的订阅者都会自动取消订阅,这又会重置流,因为那时订阅者为 0。另外:您应该将 take(1)
放在 share
之前,因为任何后续操作都会影响 hot-state.
如何使流 "truely" shared/hot 独立于任何订阅者:使用 publish
和 connect
流:
let subj = new Rx.Subject();
let chain = subj
.switchMap(v => Rx.Observable.of(10*v).do(vv => console.log("Switch map", vv)))
.take(1)
.publish();
chain.connect();
function subscribe(){
chain.subscribe(v => console.log("Next", v),
err => console.log("Error",err),
() => console.log("Completed"));
chain.subscribe(v => console.log("Next2", v),
err => console.log("Error2",err),
() => console.log("Completed2"));
subj.next(Math.random());
}
subscribe();
subscribe();
subscribe();
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
下面是响应式代码的一小段代码 (RxJs
)
let subj = new Rx.Subject();
let chain = subj
.switchMap(v => Rx.Observable.of(10*v).do(vv => console.log("Switch map", vv)))
.share()
.take(1);
function subscribe(){
chain.subscribe(v => console.log("Next", v),
err => console.log("Error",err),
() => console.log("Completed"));
chain.subscribe(v => console.log("Next2", v),
err => console.log("Error2",err),
() => console.log("Completed2"));
subj.next(Math.random());
}
subscribe();
subscribe();
subscribe();
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
根据 documentation chain
is an Observable
which should print the emitted value * 10 (switchMap), while printing it only once, no matter what's the number of subscriptions it has (share),仅对第一个发出的值执行此操作,然后完成。
前两颗子弹没问题,但最后一颗不行。 这是我得到的输出:
Switch map 9.022491050934722
Next 9.022491050934722
Completed
Next2 9.022491050934722
Completed2
Switch map 9.172999425126836
Next 9.172999425126836
Completed
Next2 9.172999425126836
Completed2
Switch map 6.168790337405257
Next 6.168790337405257
Completed
Next2 6.168790337405257
Completed2
如您所见,chain
完成了多次。
是什么使得可以多次完成相同的 Observable
?
share
是 publish
和 refCount
组合的快捷方式,这意味着流只有 "hot" 只要有至少 1 个订阅者,因此在流完成后,所有活跃的订阅者都会自动取消订阅,这又会重置流,因为那时订阅者为 0。另外:您应该将 take(1)
放在 share
之前,因为任何后续操作都会影响 hot-state.
如何使流 "truely" shared/hot 独立于任何订阅者:使用 publish
和 connect
流:
let subj = new Rx.Subject();
let chain = subj
.switchMap(v => Rx.Observable.of(10*v).do(vv => console.log("Switch map", vv)))
.take(1)
.publish();
chain.connect();
function subscribe(){
chain.subscribe(v => console.log("Next", v),
err => console.log("Error",err),
() => console.log("Completed"));
chain.subscribe(v => console.log("Next2", v),
err => console.log("Error2",err),
() => console.log("Completed2"));
subj.next(Math.random());
}
subscribe();
subscribe();
subscribe();
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>