计算链中除 combineLatest 之外的其他运算符,以避免冗余计算

Other operator in calculation chain than combineLatest to avoid redundant calculations

我已经使用 RxJS 成功地将一个更大的 Excel 计算迁移到 JS。任何输入数据都表示为一个 Observable,当任何 Excel 公式使用多个输入时,后续计算将使用 .map.combineLatest 实现。

除一个问题外,这很好用。这是一个简化的例子:

三个输入(a$=1b$=2c$=3)用于两个不同的计算(第一个是$ab = $a+$b = 3,第二个是$bc = $b+$c = 5 ) 作为计算最终结果的中间步骤 $abbc = $ab + $bc = 8

当$b现在是updated/emitting一个新的输入值时(例如4),$abbc被计算两次——第一次是在$ab被更新时(导致错误的结果$abbc=10) 并在更新 $bc 时再次生成正确的结果 12.

虽然最终结果是正确的,但中间的计算既错误又多余。有什么方法可以只在 $b 更新的情况下执行最后一次计算 - 同时在更新 a$c$ 时仍然更新计算(这将排除 zip 运算符)。我知道这个例子显然可以简化以省去中间步骤并直接从 $a$b$c 计算 $abbc - 但在真实的例子中这是不是 possible/feasible.

这是 JSbin 中的 运行 示例:https://jsbin.com/pipiyodixa/edit?js,console

这里的问题是 RxJS 的行为在设计上是正确的。

确实应该先更新b => ab => abbc 然后bc => abbc。它处理流中的值。

你想要的是处理 "layers".abcacbc 和之后计算最终值 abbc.

我能想到的唯一方法是利用 JavaScript 执行上下文和 setTimeout(() => {}, 0) 的技巧。这样你就不会安排任何超时(实际上是真正的 timeout might be > 0),而只是 运行 在 JavaScript 完成当前执行上下文后另一个执行上下文中的闭包。

糟糕的是,为了避免多次重新发出值,您需要缓存更多(因为 merge()):

var b2$ = b$.cache(1);

var ab$ = a$
  .combineLatest(b2$, (a, b) => a + b)
  .do(x => console.log('$a + $b = ' + x))
  .cache(1);

var bc$ = b2$
  .combineLatest(c$, (b, c) => b + c)
  .do(x => console.log('$b + $c = ' + x))
  .cache(1);

var abbc$ = new Rx.Observable.merge(ab$, bc$)
  .auditTime(0)
  .withLatestFrom(ab$, bc$, (_, ab, bc) => ab + bc)
  .do(x => console.log('$ab + $bc = ' + x));

console.log("Initial subscription:")
abbc$.subscribe();

b$.next(4);

auditTime() 运算符是这里最重要的东西。它触发 withLatestFrom() 更新它的值,而当它第一次被 ab$bc$ 触发时,它会忽略所有连续的发射,直到闭包执行结束(即 setTimeout()把戏)。

观看现场演示:https://jsbin.com/zoferid/edit?js,console

此外,如果您添加 a$.next(5);,最终计算只执行一次(这可能是好是坏 :))。

我不知道这是否能解决您的问题,因为正如我所说,RxJS 就是这样工作的。我还没有在任何更复杂的示例中对其进行测试,所以这可能不是您最后可以使用的方式。

请注意,cache() 运算符已在 RC.1 中删除,目前没有替代品:https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md

根据@martin 的(正确)回答,我创建了一个执行 combineLatestDelayed 的 rxjs 运算符:

Rx.Observable.prototype.combineLatestDelayed = function(b$, cb) {
  var a2$ = this.cache(1);
  var b2$ = b$.cache(1);
  return a2$
    .merge(b2$)
    .auditTime(0)
    .withLatestFrom(a2$, b2$, (_, a, b) => cb(a,b));
}

这样,我只需要将原来的.combineLatest调用替换为.combineLatestDelayed:

var a$ = new Rx.BehaviorSubject(1).do(x => console.log("Emitting $a=" + x)); 
var b$ = new Rx.BehaviorSubject(2).do(x => console.log("Emitting $b=" + x)); var b1$ = b$.cache(1);
var c$ = new Rx.BehaviorSubject(3).do(x => console.log("Emitting $c=" + x));

var ab$ = a$
  .combineLatestDelayed(b1$, (a, b) => a + b)
  .do(x => console.log('ab$: $a + $b = ' + x))

var bc$ = b1$
  .combineLatestDelayed(c$, (b, c) => b + c)
  .do(x => console.log('bc$: $b + $c = ' + x))

var abbc$ = ab$
  .combineLatestDelayed(bc$, (ab, bc) => ab + bc)
  .do(x => console.log('$abbc: $ab + $bc = ' + x));

console.log("Initial subscription:")
abbc$.subscribe();

setTimeout(() => b$.next(4), 100);

完整的 JS Bin here