RXjs:如何在流上创建运算符:扫描运算符,其中可以通过可观察对象重置累加器状态
RXjs : How to create an operator on streams : scan operator where the accumulator state can be reset through an observable
我需要在具有以下特征的流上创建一个新的实例运算符
签名
Rx.Observable.prototype.scan_with_reset(accumulator, seed$)
其中:
参数
accumulator (Function)
: 在每个元素上调用的累加器函数。
seed$ (Observable)
:一个可观察对象,其值将用于重新启动累加器函数。累加器函数具有以下签名 function accumulator_fn(accumulator_state, source_value)
。我希望 seed$
中的值将 accumulator_state
重置为 seed
值并发出 seed
值。
Returns
(Observable) :由 comonadic bind 操作产生的可观察序列(无论那意味着什么,我在这里复制 Rxjs
文档)。比。正常的 scan
运算符,这里发生的是,当累加器函数是 'restarted' 来自 seed$
可观察对象发出的种子值时,发出该种子值,下一个值是scan_with_reset
运算符发出的将是 accumulator_fn(seed, source_value)
使用示例:
var seed$ = Rx.Observable.fromEvent(document, 'keydown')
.map(function(ev){return ev.keyCode})
.startWith(0);
var result$ = counter$.scan_with_reset(seed$,
function accumulator_fn (acc, counter) {return acc+counter});
下图应该更详细地解释预期结果:
seed : 0---------13--------27------------
counter : -1--5--2----6---2-----4---1---3---
result : 0-1--6--8-13-19--21-27-31--32--35-
我最初的尝试是修改 accumulator_fn
让 seed$
修改一个在 accumulator_fn
范围内的变量,这样我就可以检测函数中的变化本身。
我在这里追求两个目标:
- 有一个尽可能无状态和无闭包的实现
- 了解定义自己的运算符背后的机制
流,希望这只是一个简单的例子
我查看了 scan
源代码:https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/scan.js
但我不确定从那里去哪里。
有人有创建 Rxjs 流操作符的经验吗?应遵循的惯例和应避免的陷阱是什么?有没有我可以看的定制运算符的例子?你会如何实施这个特定的?
[更新]:已接受答案的一些测试代码
var seed$ = Rx.Observable.fromEvent(document, 'keydown')
.map(function(ev){return ev.keyCode})
.startWith(0);
var counter$ = Rx.Observable.fromEvent(document, 'mousemove')
.map(function(ev){return 1});
var result$ = counter$.scanWithReset(seed$,
function accumulator_fn (acc, counter) {return acc+counter});
var s = function (x) {console.log("value: ", x)};
var disposable = result$.subscribe(s)
移动鼠标应该显示值增加 1,按下一个键应该用按下的键的值重新启动计数器。
作为创建运算符的一般情况,通常最容易使用 Observable.create
方法,该方法本质上定义了您的 Observable
在订阅或仅包装现有运算符集时的行为方式ala share
。
当您更多地关注性能时,还有一些其他考虑因素(Observable.create
在规模上并不是非常有效)并且您可以考虑创建自定义 Observable
,例如 map。
对于你的情况,我现在推荐前者。我会把你的问题看作是几个独立的流,我们想把它们展平成一个流。每个新流将在触发重置时开始。这听起来真的很像 flatMap
对我来说:
Rx.Observable.prototype.scanWithReset = function ($reset, accum, seed) {
var source = this;
//Creates a new Observable
return Rx.Observable.create(function (observer) {
//We will be reusing this source so we want to make sure it is shared
var p = source.publish();
var r = $reset
//Make sure the seed is added first
.startWith(seed)
//This will switch to a new sequence with the associated value
//every time $reset fires
.flatMapLatest(function (resetValue) {
//Perform the scan with the latest value
return source.scan(accum, resetValue);
});
//Make sure every thing gets cleaned up
return new Rx.CompositeDisposable(
r.subscribe(observer),
//We are ready to start receiving from our source
p.connect());
});
}
我需要在具有以下特征的流上创建一个新的实例运算符
签名
Rx.Observable.prototype.scan_with_reset(accumulator, seed$)
其中:
参数
accumulator (Function)
: 在每个元素上调用的累加器函数。seed$ (Observable)
:一个可观察对象,其值将用于重新启动累加器函数。累加器函数具有以下签名function accumulator_fn(accumulator_state, source_value)
。我希望seed$
中的值将accumulator_state
重置为seed
值并发出seed
值。Returns (Observable) :由 comonadic bind 操作产生的可观察序列(无论那意味着什么,我在这里复制
Rxjs
文档)。比。正常的scan
运算符,这里发生的是,当累加器函数是 'restarted' 来自seed$
可观察对象发出的种子值时,发出该种子值,下一个值是scan_with_reset
运算符发出的将是accumulator_fn(seed, source_value)
使用示例:
var seed$ = Rx.Observable.fromEvent(document, 'keydown') .map(function(ev){return ev.keyCode}) .startWith(0); var result$ = counter$.scan_with_reset(seed$, function accumulator_fn (acc, counter) {return acc+counter});
下图应该更详细地解释预期结果:
seed : 0---------13--------27------------ counter : -1--5--2----6---2-----4---1---3--- result : 0-1--6--8-13-19--21-27-31--32--35-
我最初的尝试是修改 accumulator_fn
让 seed$
修改一个在 accumulator_fn
范围内的变量,这样我就可以检测函数中的变化本身。
我在这里追求两个目标:
- 有一个尽可能无状态和无闭包的实现
- 了解定义自己的运算符背后的机制 流,希望这只是一个简单的例子
我查看了 scan
源代码:https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/scan.js
但我不确定从那里去哪里。
有人有创建 Rxjs 流操作符的经验吗?应遵循的惯例和应避免的陷阱是什么?有没有我可以看的定制运算符的例子?你会如何实施这个特定的?
[更新]:已接受答案的一些测试代码
var seed$ = Rx.Observable.fromEvent(document, 'keydown')
.map(function(ev){return ev.keyCode})
.startWith(0);
var counter$ = Rx.Observable.fromEvent(document, 'mousemove')
.map(function(ev){return 1});
var result$ = counter$.scanWithReset(seed$,
function accumulator_fn (acc, counter) {return acc+counter});
var s = function (x) {console.log("value: ", x)};
var disposable = result$.subscribe(s)
移动鼠标应该显示值增加 1,按下一个键应该用按下的键的值重新启动计数器。
作为创建运算符的一般情况,通常最容易使用 Observable.create
方法,该方法本质上定义了您的 Observable
在订阅或仅包装现有运算符集时的行为方式ala share
。
当您更多地关注性能时,还有一些其他考虑因素(Observable.create
在规模上并不是非常有效)并且您可以考虑创建自定义 Observable
,例如 map。
对于你的情况,我现在推荐前者。我会把你的问题看作是几个独立的流,我们想把它们展平成一个流。每个新流将在触发重置时开始。这听起来真的很像 flatMap
对我来说:
Rx.Observable.prototype.scanWithReset = function ($reset, accum, seed) {
var source = this;
//Creates a new Observable
return Rx.Observable.create(function (observer) {
//We will be reusing this source so we want to make sure it is shared
var p = source.publish();
var r = $reset
//Make sure the seed is added first
.startWith(seed)
//This will switch to a new sequence with the associated value
//every time $reset fires
.flatMapLatest(function (resetValue) {
//Perform the scan with the latest value
return source.scan(accum, resetValue);
});
//Make sure every thing gets cleaned up
return new Rx.CompositeDisposable(
r.subscribe(observer),
//We are ready to start receiving from our source
p.connect());
});
}