RxJs:如何只维护最新值直到内部可观察完成

RxJs: How to only maintain the latest value until inner observable complete

我是 RxJs 的新手,在 "RxJs way" 中遇到了困难:

无限流 a$ 发出一个值 a 一次。

async() 获取 a 并执行异步操作。

如果 a$async 未决时发出值,则只保留最新的 al

前面的async完成后,如果有一个al,运行 async(al).

以此类推

a$:----a1----------a2----a3-----------------------a4-----------
       async(a1):------------end                  async(a4):---
                             async(a3):-----end

这是我想出来的,有点讨厌:

var asyncIdle$ = new Rx.BehaviorSubject()
var asyncRunning$ = new Rx.Subject()
var async$ = asyncIdle$

function async (val) {
  async$ = asyncRunning$
  // do something with val
  console.log(val + ' handling')
  setTimeout(() => {
    console.log(val + ' complete')
    async$.next()
    async$ = asyncIdle$
  }, 2000)
}

// simulate a$
var a$ = Rx.Observable.fromEvent(document, 'click')
.mapTo(1)
.scan((acc, curr) => acc + curr)
.do(val => console.log('got ' + val))


a$.debounce(() => async$)
.subscribe(val => {
  async(val)
})

使用 first()repeat() 的组合。 if a$ 完成发射序列完成

//emit every 1s
const a$=new Rx.BehaviorSubject(0)
Rx.Observable.interval(1000).take(100).skip(1).subscribe(a$);

// //simulate aysnc 
const async = (val)=>{
  console.log('async start with:'+ val)
  return Rx.Observable.timer(5100).mapTo('async done:'+val);
}

a$.first().switchMap(value=>async(value))
  .repeat()
  .catch(e=>Rx.Observable.empty())
  .subscribe(console.log,console.err,console.warn)

a$.subscribe(console.warn)

https://jsbin.com/tohahod/65/edit?js,console

你可以使用audit operator来解决这个问题,像这样(评论应该解释它是如何工作的):

// Simulate the source.

const source = Rx.Observable.merge(
  Rx.Observable.of(1).delay(0),
  Rx.Observable.of(2).delay(10),
  Rx.Observable.of(3).delay(20),
  Rx.Observable.of(4).delay(150),
  Rx.Observable.of(5).delay(300)
).do(value => console.log("source", value));

// Simulate the async task.

function asyncTask(value) {
  return Rx.Observable
    .of(value)
    .do(value => console.log(" before async", value))
    .delay(100)
    .do(value => console.log(" after async", value));
}

// Compose an observable that's based on the source.
// Use audit to ensure a value is not emitted until
// the async task has been performed.
// Use share so that the signal does not effect a
// second subscription to the source.

let signal;

const audited = source
  .audit(() => signal)
  .mergeMap(value => asyncTask(value))
  .share();

// Compose a signal from the audited observable to
// which the async task is applied.
// Use startWith so that the first emitted value
// passes the audit.

signal = audited
  .mapTo(true)
  .startWith(true);

audited.subscribe(value => console.log("output", value));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

我在打字稿中想到了这个解决方案:

我有一个简单的Gateclass可以打开或关闭:

enum GateStatus {
  open = "open",
  closed = "closed"
}

class Gate {
  private readonly gate$: BehaviorSubject<GateStatus>;

  readonly open$: Observable<GateStatus>;
  readonly closed$: Observable<GateStatus>;

  constructor(initialState = GateStatus.open) {
    this.gate$ = new BehaviorSubject<GateStatus>(initialState);
    this.open$ = this.gate$
      .asObservable()
      .pipe(filter(status => status === GateStatus.open));
    this.closed$ = this.gate$
      .asObservable()
      .pipe(filter(status => status === GateStatus.closed));
  }

  open() {
    this.gate$.next(GateStatus.open);
  }
  close() {
    this.gate$.next(GateStatus.closed);
  }
}

运算符功能非常简单。一开始大门是敞开的。在开始请求之前,我们将其关闭,并在请求完成后再次打开它。
audit() 只会在门打开时让最近的请求数据通过。

export const requestThrottle = <T>(
  requestHandlerFactory: (requestData: T) => Observable<any>
) => (requestData: Observable<T>) => {
  const gate = new Gate();
  return requestData.pipe(
    audit(_ => gate.open$),
    // NOTE: when the order is important, use concatMap() instead of mergeMap()
    mergeMap(value => {
      gate.close();
      return requestHandlerFactory(value).pipe(finalize(() => gate.open()));
    })
  );
};

像这样使用它:

src.pipe(
    requestThrottle(() => of(1).pipe(delay(100)))
);

code exmaple on stackblitz