RxJS v5+ 中的 `Observable.transduce` 发生了什么?

Whatever happened to `Observable.transduce` in RxJS v5+?

RxJS v4 曾经有一个 Observable.transduce 方法,它采用了一个转换器。这允许使用独立于库的换能器运算符,这些运算符在过去具有主要的性能优势。

来源

RxJS v5.5 和 v6 具有可管道运算符,并且 v6 删除了方法链接。因此,我假设 RxJS 运算符是标准的转换器。翻了一下源码,好像不是这样的。

RxJS v6 运算符的功能类似于转换器,其中每个值在下一个值通过之前完全通过链传递,但 RxJS v6 运算符没有使用我在其他库中看到的标准转换器方法,这意味着,我不要认为它们是便携式的。

关于传感器的全部事情是他们对集合本身一无所知。您可以编写 100 个普遍适用于任何集合或流类型的运算符,而不是专门为可观察对象编写 100 个运算符。

.pipe 是否与 .transduce 一致,或者此方法是否已在 RxJS v5 中完全删除?

我有完全相同的问题,但无法在任何地方找到答案。是的,你可以 pipe,但我相信这会为每个操作员创建中介可观察对象。不过我不确定,这与阅读代码有关。

所以我想出了自己的 transduce 运算符:

function transformForObserver(o) {
  return {
    "@@transducer/init": function() {
      return o;
    },
    "@@transducer/step": function(obs, input) {
      return obs.next(input);
    },
    "@@transducer/result": function(obs) {
      return obs.complete();
    }
  };
}

    const transduce = (obs, transducer) => {
      const xform = transducer(transformForObserver);

      return Observable.create(o => {
        return obs.subscribe({
          next: x => {
            const res = tryCatch(
              xform["@@transducer/step"],
              err => {
                console.error(`Error occurred in transducer/step!`, err);
                return err;
              }
            )(xform, o, x);

            if (res instanceof Error) { o.error(res); }
          },
          error: err => {
            console.error(`Error occurred in observable passed to Rx transduce fn!`, err);
            o.error(err);
          },
          complete: () => {o.complete();}
        });
      });
    }

还没有测试过,如果有兴趣的话会post很快。

Update :我 fork 了 jslongser 的 transducers 库,并在其中包含了这样的转换器。 fork是https://github.com/brucou/transducers.js, and the function is transduceLazyObservable. Cf. tests使用示例