序列化 Observables

Serialising Observables

我想序列化返回可观察对象的函数的执行。我已经设法做到了这一点,但这不是一个非常务实的解决方案。实现以下目标的反应性更强的方法是什么?

import Rx from 'rx'

export default class Executor {

  constructor() {
    this.queue = []
    this.draining = false
  }

  run(fn) {
    return Rx.Observable.create(o => {
      this.queue.push(Rx.Observable.defer(() =>
       fn()
          .doOnNext((value) => o.onNext(value))
          .doOnError((err) => o.onError(err))
          .doOnCompleted(() => o.onCompleted())))

      async () => {
        if (this.draining) {
          return
        }

        this.draining = true

        while (this.queue.length > 0) {
          try {
            await this.queue.shift().toPromise()
          } catch(err) {
            // Do nothing...
          }
        }

        this.draining = false
      }()
    })
  }
}

不完全确定您要实现的目标,但听起来您在使用 flatMapWithMaxConcurrent 运算符。

import Rx from 'rx'

export default class Executor {

  constructor() {
    // Queue of: () => Observable
    this.queue = new Rx.Subject();
    this.queue
        .flatMapWithMaxConcurrent(1,
            fn => fn().catch(Rx.Observable.empty()) // Replace with proper error handling
        )
        .subscribe(result => console.log(result));
  }

  push(fn) {
    this.queue.onNext(fn);
  }
}