将 RxJS v4 代码转换为 v5,使用 "pull" 处理队列

Converting RxJS v4 code to v5, processing a queue with a "pull"

---abcde-----f-------gh-----i---->  //Events

我有一个 "work queue" 想要 observe/subscribe。这是一组要处理的命令对象。新的工作项目通常会突然出现,并且需要按顺序进行处理(按照收到的顺序,一次一个,直到完全处理完毕)。

我正在使用 RxJS 5.0.0-beta.6。 (其他图书馆强加的版本)

这是一个工作示例,它说明了我想要的行为,但使用的是 RxJS v4。

有问题的主要代码是这样的...

var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
  .timestamp()
  .tap(({timestamp}) => updatePanelAppend(pending, timestamp));

var inProgress$ = events$;

var done$ = inProgress$
  .flatMapWithMaxConcurrent(1, ({timestamp}) => 
                            Rx.Observable.fromPromise(() => {
                              updatePanelAppend(inProgress, timestamp);
                              removeFromPanel(pending, timestamp);
                              return expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)
                           }));

done$.subscribeOnNext((timestamp) => {
  updatePanelAppend(done, timestamp);
  removeFromPanel(inProgress, timestamp);
});

http://jsbin.com/meyife/edit?js,output

鉴于 API 和 incomplete/changing 文档的当前 beta 状态,我不知道如何在 RxJS 5 中执行此操作。

更新:这个从 v4 到 v5 的 migration guide 显示了许多被删除的功能,但没有指导如何以新的方式做事。删除的操作示例:.tap、.controlled、.flatMapWithMaxConcurrent(已重命名)。

  • flatMap/mergeMap - 现在采用并发参数

  • tap -> do

  • subscribeOnNext 不再存在,所以只需使用带有单个参数的 subscribe

  • fromPromise 重载在 RxJS 5 上不存在,所以使用 defer 代替。

查看更新后的 jsbin here