RxJs:如何只维护最新值直到内部可观察完成
RxJs: How to only maintain the latest value until inner observable complete
我是 RxJs 的新手,在 "RxJs way" 中遇到了困难:
无限流 a$
发出一个值 a
一次。
async()
获取 a
并执行异步操作。
如果 a$
在 async
未决时发出值,则只保留最新的 al
。
前面的async
完成后,如果有一个al
,运行 async(al)
.
以此类推
a$:----a1----------a2----a3-----------------------a4-----------
async(a1):------------end async(a4):---
async(a3):-----end
这是我想出来的,有点讨厌:
var asyncIdle$ = new Rx.BehaviorSubject()
var asyncRunning$ = new Rx.Subject()
var async$ = asyncIdle$
function async (val) {
async$ = asyncRunning$
// do something with val
console.log(val + ' handling')
setTimeout(() => {
console.log(val + ' complete')
async$.next()
async$ = asyncIdle$
}, 2000)
}
// simulate a$
var a$ = Rx.Observable.fromEvent(document, 'click')
.mapTo(1)
.scan((acc, curr) => acc + curr)
.do(val => console.log('got ' + val))
a$.debounce(() => async$)
.subscribe(val => {
async(val)
})
使用 first()
和 repeat()
的组合。 if a$
完成发射序列完成
//emit every 1s
const a$=new Rx.BehaviorSubject(0)
Rx.Observable.interval(1000).take(100).skip(1).subscribe(a$);
// //simulate aysnc
const async = (val)=>{
console.log('async start with:'+ val)
return Rx.Observable.timer(5100).mapTo('async done:'+val);
}
a$.first().switchMap(value=>async(value))
.repeat()
.catch(e=>Rx.Observable.empty())
.subscribe(console.log,console.err,console.warn)
a$.subscribe(console.warn)
你可以使用audit
operator来解决这个问题,像这样(评论应该解释它是如何工作的):
// Simulate the source.
const source = Rx.Observable.merge(
Rx.Observable.of(1).delay(0),
Rx.Observable.of(2).delay(10),
Rx.Observable.of(3).delay(20),
Rx.Observable.of(4).delay(150),
Rx.Observable.of(5).delay(300)
).do(value => console.log("source", value));
// Simulate the async task.
function asyncTask(value) {
return Rx.Observable
.of(value)
.do(value => console.log(" before async", value))
.delay(100)
.do(value => console.log(" after async", value));
}
// Compose an observable that's based on the source.
// Use audit to ensure a value is not emitted until
// the async task has been performed.
// Use share so that the signal does not effect a
// second subscription to the source.
let signal;
const audited = source
.audit(() => signal)
.mergeMap(value => asyncTask(value))
.share();
// Compose a signal from the audited observable to
// which the async task is applied.
// Use startWith so that the first emitted value
// passes the audit.
signal = audited
.mapTo(true)
.startWith(true);
audited.subscribe(value => console.log("output", value));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
我在打字稿中想到了这个解决方案:
我有一个简单的Gate
class可以打开或关闭:
enum GateStatus {
open = "open",
closed = "closed"
}
class Gate {
private readonly gate$: BehaviorSubject<GateStatus>;
readonly open$: Observable<GateStatus>;
readonly closed$: Observable<GateStatus>;
constructor(initialState = GateStatus.open) {
this.gate$ = new BehaviorSubject<GateStatus>(initialState);
this.open$ = this.gate$
.asObservable()
.pipe(filter(status => status === GateStatus.open));
this.closed$ = this.gate$
.asObservable()
.pipe(filter(status => status === GateStatus.closed));
}
open() {
this.gate$.next(GateStatus.open);
}
close() {
this.gate$.next(GateStatus.closed);
}
}
运算符功能非常简单。一开始大门是敞开的。在开始请求之前,我们将其关闭,并在请求完成后再次打开它。
audit()
只会在门打开时让最近的请求数据通过。
export const requestThrottle = <T>(
requestHandlerFactory: (requestData: T) => Observable<any>
) => (requestData: Observable<T>) => {
const gate = new Gate();
return requestData.pipe(
audit(_ => gate.open$),
// NOTE: when the order is important, use concatMap() instead of mergeMap()
mergeMap(value => {
gate.close();
return requestHandlerFactory(value).pipe(finalize(() => gate.open()));
})
);
};
像这样使用它:
src.pipe(
requestThrottle(() => of(1).pipe(delay(100)))
);
我是 RxJs 的新手,在 "RxJs way" 中遇到了困难:
无限流 a$
发出一个值 a
一次。
async()
获取 a
并执行异步操作。
如果 a$
在 async
未决时发出值,则只保留最新的 al
。
前面的async
完成后,如果有一个al
,运行 async(al)
.
以此类推
a$:----a1----------a2----a3-----------------------a4-----------
async(a1):------------end async(a4):---
async(a3):-----end
这是我想出来的,有点讨厌:
var asyncIdle$ = new Rx.BehaviorSubject()
var asyncRunning$ = new Rx.Subject()
var async$ = asyncIdle$
function async (val) {
async$ = asyncRunning$
// do something with val
console.log(val + ' handling')
setTimeout(() => {
console.log(val + ' complete')
async$.next()
async$ = asyncIdle$
}, 2000)
}
// simulate a$
var a$ = Rx.Observable.fromEvent(document, 'click')
.mapTo(1)
.scan((acc, curr) => acc + curr)
.do(val => console.log('got ' + val))
a$.debounce(() => async$)
.subscribe(val => {
async(val)
})
使用 first()
和 repeat()
的组合。 if a$
完成发射序列完成
//emit every 1s
const a$=new Rx.BehaviorSubject(0)
Rx.Observable.interval(1000).take(100).skip(1).subscribe(a$);
// //simulate aysnc
const async = (val)=>{
console.log('async start with:'+ val)
return Rx.Observable.timer(5100).mapTo('async done:'+val);
}
a$.first().switchMap(value=>async(value))
.repeat()
.catch(e=>Rx.Observable.empty())
.subscribe(console.log,console.err,console.warn)
a$.subscribe(console.warn)
你可以使用audit
operator来解决这个问题,像这样(评论应该解释它是如何工作的):
// Simulate the source.
const source = Rx.Observable.merge(
Rx.Observable.of(1).delay(0),
Rx.Observable.of(2).delay(10),
Rx.Observable.of(3).delay(20),
Rx.Observable.of(4).delay(150),
Rx.Observable.of(5).delay(300)
).do(value => console.log("source", value));
// Simulate the async task.
function asyncTask(value) {
return Rx.Observable
.of(value)
.do(value => console.log(" before async", value))
.delay(100)
.do(value => console.log(" after async", value));
}
// Compose an observable that's based on the source.
// Use audit to ensure a value is not emitted until
// the async task has been performed.
// Use share so that the signal does not effect a
// second subscription to the source.
let signal;
const audited = source
.audit(() => signal)
.mergeMap(value => asyncTask(value))
.share();
// Compose a signal from the audited observable to
// which the async task is applied.
// Use startWith so that the first emitted value
// passes the audit.
signal = audited
.mapTo(true)
.startWith(true);
audited.subscribe(value => console.log("output", value));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
我在打字稿中想到了这个解决方案:
我有一个简单的Gate
class可以打开或关闭:
enum GateStatus {
open = "open",
closed = "closed"
}
class Gate {
private readonly gate$: BehaviorSubject<GateStatus>;
readonly open$: Observable<GateStatus>;
readonly closed$: Observable<GateStatus>;
constructor(initialState = GateStatus.open) {
this.gate$ = new BehaviorSubject<GateStatus>(initialState);
this.open$ = this.gate$
.asObservable()
.pipe(filter(status => status === GateStatus.open));
this.closed$ = this.gate$
.asObservable()
.pipe(filter(status => status === GateStatus.closed));
}
open() {
this.gate$.next(GateStatus.open);
}
close() {
this.gate$.next(GateStatus.closed);
}
}
运算符功能非常简单。一开始大门是敞开的。在开始请求之前,我们将其关闭,并在请求完成后再次打开它。
audit()
只会在门打开时让最近的请求数据通过。
export const requestThrottle = <T>(
requestHandlerFactory: (requestData: T) => Observable<any>
) => (requestData: Observable<T>) => {
const gate = new Gate();
return requestData.pipe(
audit(_ => gate.open$),
// NOTE: when the order is important, use concatMap() instead of mergeMap()
mergeMap(value => {
gate.close();
return requestHandlerFactory(value).pipe(finalize(() => gate.open()));
})
);
};
像这样使用它:
src.pipe(
requestThrottle(() => of(1).pipe(delay(100)))
);