计算链中除 combineLatest 之外的其他运算符,以避免冗余计算
Other operator in calculation chain than combineLatest to avoid redundant calculations
我已经使用 RxJS 成功地将一个更大的 Excel 计算迁移到 JS。任何输入数据都表示为一个 Observable,当任何 Excel 公式使用多个输入时,后续计算将使用 .map
和 .combineLatest
实现。
除一个问题外,这很好用。这是一个简化的例子:
三个输入(a$=1
、b$=2
、c$=3
)用于两个不同的计算(第一个是$ab = $a+$b = 3
,第二个是$bc = $b+$c = 5
) 作为计算最终结果的中间步骤 $abbc = $ab + $bc = 8
。
当$b现在是updated/emitting一个新的输入值时(例如4
),$abbc被计算两次——第一次是在$ab被更新时(导致错误的结果$abbc=10
) 并在更新 $bc 时再次生成正确的结果 12
.
虽然最终结果是正确的,但中间的计算既错误又多余。有什么方法可以只在 $b
更新的情况下执行最后一次计算 - 同时在更新 a$
或 c$
时仍然更新计算(这将排除 zip 运算符)。我知道这个例子显然可以简化以省去中间步骤并直接从 $a
、$b
和 $c
计算 $abbc
- 但在真实的例子中这是不是 possible/feasible.
这是 JSbin 中的 运行 示例:https://jsbin.com/pipiyodixa/edit?js,console
这里的问题是 RxJS 的行为在设计上是正确的。
确实应该先更新b
=> ab
=> abbc
然后bc
=> abbc
。它处理流中的值。
你想要的是处理 "layers".a
、b
、c
、ac
、bc
和之后计算最终值 abbc
.
我能想到的唯一方法是利用 JavaScript 执行上下文和 setTimeout(() => {}, 0)
的技巧。这样你就不会安排任何超时(实际上是真正的 timeout might be > 0
),而只是 运行 在 JavaScript 完成当前执行上下文后另一个执行上下文中的闭包。
糟糕的是,为了避免多次重新发出值,您需要缓存更多(因为 merge()
):
var b2$ = b$.cache(1);
var ab$ = a$
.combineLatest(b2$, (a, b) => a + b)
.do(x => console.log('$a + $b = ' + x))
.cache(1);
var bc$ = b2$
.combineLatest(c$, (b, c) => b + c)
.do(x => console.log('$b + $c = ' + x))
.cache(1);
var abbc$ = new Rx.Observable.merge(ab$, bc$)
.auditTime(0)
.withLatestFrom(ab$, bc$, (_, ab, bc) => ab + bc)
.do(x => console.log('$ab + $bc = ' + x));
console.log("Initial subscription:")
abbc$.subscribe();
b$.next(4);
auditTime()
运算符是这里最重要的东西。它触发 withLatestFrom()
更新它的值,而当它第一次被 ab$
或 bc$
触发时,它会忽略所有连续的发射,直到闭包执行结束(即 setTimeout()
把戏)。
观看现场演示:https://jsbin.com/zoferid/edit?js,console
此外,如果您添加 a$.next(5);
,最终计算只执行一次(这可能是好是坏 :))。
我不知道这是否能解决您的问题,因为正如我所说,RxJS 就是这样工作的。我还没有在任何更复杂的示例中对其进行测试,所以这可能不是您最后可以使用的方式。
请注意,cache()
运算符已在 RC.1 中删除,目前没有替代品:https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md
根据@martin 的(正确)回答,我创建了一个执行 combineLatestDelayed 的 rxjs 运算符:
Rx.Observable.prototype.combineLatestDelayed = function(b$, cb) {
var a2$ = this.cache(1);
var b2$ = b$.cache(1);
return a2$
.merge(b2$)
.auditTime(0)
.withLatestFrom(a2$, b2$, (_, a, b) => cb(a,b));
}
这样,我只需要将原来的.combineLatest
调用替换为.combineLatestDelayed
:
var a$ = new Rx.BehaviorSubject(1).do(x => console.log("Emitting $a=" + x));
var b$ = new Rx.BehaviorSubject(2).do(x => console.log("Emitting $b=" + x)); var b1$ = b$.cache(1);
var c$ = new Rx.BehaviorSubject(3).do(x => console.log("Emitting $c=" + x));
var ab$ = a$
.combineLatestDelayed(b1$, (a, b) => a + b)
.do(x => console.log('ab$: $a + $b = ' + x))
var bc$ = b1$
.combineLatestDelayed(c$, (b, c) => b + c)
.do(x => console.log('bc$: $b + $c = ' + x))
var abbc$ = ab$
.combineLatestDelayed(bc$, (ab, bc) => ab + bc)
.do(x => console.log('$abbc: $ab + $bc = ' + x));
console.log("Initial subscription:")
abbc$.subscribe();
setTimeout(() => b$.next(4), 100);
完整的 JS Bin here
我已经使用 RxJS 成功地将一个更大的 Excel 计算迁移到 JS。任何输入数据都表示为一个 Observable,当任何 Excel 公式使用多个输入时,后续计算将使用 .map
和 .combineLatest
实现。
除一个问题外,这很好用。这是一个简化的例子:
三个输入(a$=1
、b$=2
、c$=3
)用于两个不同的计算(第一个是$ab = $a+$b = 3
,第二个是$bc = $b+$c = 5
) 作为计算最终结果的中间步骤 $abbc = $ab + $bc = 8
。
当$b现在是updated/emitting一个新的输入值时(例如4
),$abbc被计算两次——第一次是在$ab被更新时(导致错误的结果$abbc=10
) 并在更新 $bc 时再次生成正确的结果 12
.
虽然最终结果是正确的,但中间的计算既错误又多余。有什么方法可以只在 $b
更新的情况下执行最后一次计算 - 同时在更新 a$
或 c$
时仍然更新计算(这将排除 zip 运算符)。我知道这个例子显然可以简化以省去中间步骤并直接从 $a
、$b
和 $c
计算 $abbc
- 但在真实的例子中这是不是 possible/feasible.
这是 JSbin 中的 运行 示例:https://jsbin.com/pipiyodixa/edit?js,console
这里的问题是 RxJS 的行为在设计上是正确的。
确实应该先更新b
=> ab
=> abbc
然后bc
=> abbc
。它处理流中的值。
你想要的是处理 "layers".a
、b
、c
、ac
、bc
和之后计算最终值 abbc
.
我能想到的唯一方法是利用 JavaScript 执行上下文和 setTimeout(() => {}, 0)
的技巧。这样你就不会安排任何超时(实际上是真正的 timeout might be > 0
),而只是 运行 在 JavaScript 完成当前执行上下文后另一个执行上下文中的闭包。
糟糕的是,为了避免多次重新发出值,您需要缓存更多(因为 merge()
):
var b2$ = b$.cache(1);
var ab$ = a$
.combineLatest(b2$, (a, b) => a + b)
.do(x => console.log('$a + $b = ' + x))
.cache(1);
var bc$ = b2$
.combineLatest(c$, (b, c) => b + c)
.do(x => console.log('$b + $c = ' + x))
.cache(1);
var abbc$ = new Rx.Observable.merge(ab$, bc$)
.auditTime(0)
.withLatestFrom(ab$, bc$, (_, ab, bc) => ab + bc)
.do(x => console.log('$ab + $bc = ' + x));
console.log("Initial subscription:")
abbc$.subscribe();
b$.next(4);
auditTime()
运算符是这里最重要的东西。它触发 withLatestFrom()
更新它的值,而当它第一次被 ab$
或 bc$
触发时,它会忽略所有连续的发射,直到闭包执行结束(即 setTimeout()
把戏)。
观看现场演示:https://jsbin.com/zoferid/edit?js,console
此外,如果您添加 a$.next(5);
,最终计算只执行一次(这可能是好是坏 :))。
我不知道这是否能解决您的问题,因为正如我所说,RxJS 就是这样工作的。我还没有在任何更复杂的示例中对其进行测试,所以这可能不是您最后可以使用的方式。
请注意,cache()
运算符已在 RC.1 中删除,目前没有替代品:https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md
根据@martin 的(正确)回答,我创建了一个执行 combineLatestDelayed 的 rxjs 运算符:
Rx.Observable.prototype.combineLatestDelayed = function(b$, cb) {
var a2$ = this.cache(1);
var b2$ = b$.cache(1);
return a2$
.merge(b2$)
.auditTime(0)
.withLatestFrom(a2$, b2$, (_, a, b) => cb(a,b));
}
这样,我只需要将原来的.combineLatest
调用替换为.combineLatestDelayed
:
var a$ = new Rx.BehaviorSubject(1).do(x => console.log("Emitting $a=" + x));
var b$ = new Rx.BehaviorSubject(2).do(x => console.log("Emitting $b=" + x)); var b1$ = b$.cache(1);
var c$ = new Rx.BehaviorSubject(3).do(x => console.log("Emitting $c=" + x));
var ab$ = a$
.combineLatestDelayed(b1$, (a, b) => a + b)
.do(x => console.log('ab$: $a + $b = ' + x))
var bc$ = b1$
.combineLatestDelayed(c$, (b, c) => b + c)
.do(x => console.log('bc$: $b + $c = ' + x))
var abbc$ = ab$
.combineLatestDelayed(bc$, (ab, bc) => ab + bc)
.do(x => console.log('$abbc: $ab + $bc = ' + x));
console.log("Initial subscription:")
abbc$.subscribe();
setTimeout(() => b$.next(4), 100);
完整的 JS Bin here