RXjs:如何在流上创建运算符:扫描运算符,其中可以通过可观察对象重置累加器状态

RXjs : How to create an operator on streams : scan operator where the accumulator state can be reset through an observable

我需要在具有以下特征的流上创建一个新的实例运算符

我最初的尝试是修改 accumulator_fnseed$ 修改一个在 accumulator_fn 范围内的变量,这样我就可以检测函数中的变化本身。

我在这里追求两个目标:

我查看了 scan 源代码:https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/scan.js 但我不确定从那里去哪里。

有人有创建 Rxjs 流操作符的经验吗?应遵循的惯例和应避免的陷阱是什么?有没有我可以看的定制运算符的例子?你会如何实施这个特定的?

[更新]:已接受答案的一些测试代码

 var seed$ = Rx.Observable.fromEvent(document, 'keydown')
                          .map(function(ev){return ev.keyCode})
                          .startWith(0);
 var counter$ = Rx.Observable.fromEvent(document, 'mousemove')
                             .map(function(ev){return 1});
 var result$ = counter$.scanWithReset(seed$,
                                      function accumulator_fn (acc, counter) {return acc+counter});
 var s = function (x) {console.log("value: ", x)};
 var disposable = result$.subscribe(s)

移动鼠标应该显示值增加 1,按下一个键应该用按下的键的值重新启动计数器。

作为创建运算符的一般情况,通常最容易使用 Observable.create 方法,该方法本质上定义了您的 Observable 在订阅或仅包装现有运算符集时的行为方式ala share

当您更多地关注性能时,还有一些其他考虑因素(Observable.create 在规模上并不是非常有效)并且您可以考虑创建自定义 Observable,例如 map

对于你的情况,我现在推荐前者。我会把你的问题看作是几个独立的流,我们想把它们展平成一个流。每个新流将在触发重置时开始。这听起来真的很像 flatMap 对我来说:

Rx.Observable.prototype.scanWithReset = function ($reset, accum, seed) {
    var source = this;
    //Creates a new Observable
    return Rx.Observable.create(function (observer) {
        //We will be reusing this source so we want to make sure it is shared
        var p = source.publish();


        var r = $reset
            //Make sure the seed is added first
            .startWith(seed)
            //This will switch to a new sequence with the associated value
            //every time $reset fires
            .flatMapLatest(function (resetValue) {
              //Perform the scan with the latest value
              return source.scan(accum, resetValue);
        });

        //Make sure every thing gets cleaned up
        return new Rx.CompositeDisposable(
            r.subscribe(observer),
            //We are ready to start receiving from our source
            p.connect());
    });
}