Rxjs 包装来自 D3 (Observable) 的其他库函数

Rxjs wrapping other libraries functions from D3 (Observable)

我只是不明白这个概念,正在寻求启示。我正在尝试观察何时获取数据但混淆了过程。这是我到目前为止所拥有的。 ajax 请求是通过 d3.tsv.

完成的
var test = Rx.Observable.just(
    d3.tsv("https://gist.githubusercontent.com/mbostock/3885304/raw/37bd91278846c053188a130a01770cddff023590/data.tsv", 
        function(d) {
          return {
            letter: d.letter,
            frequency: +d.frequency
          };
        }, 
        function(error, rows) {
          console.log('mytest2',rows);
        }
    )
);

var observer = Rx.Observer.create(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); });

var subscription = test.subscribe(observer);

虽然这个 ajax 请求在技术上是可行的,但所有 Observable 函数都发生在数据到达之前。我如何在我的 'onNext' 日志给我数据而不是仅在 d3.tsv 函数内部获取数据的情况下构造它?

有专门用于将回调转换为可观察对象的 RxJS 运算符(.fromCallback.fromNodeCallback)。但是他们不会在这里工作,因为他们期望一个回调,而那个回调是最后一个参数。这里有两个回调,一个用于成功,一个用于结果。我不知道这种情况下有任何特殊运算符,所以我建议您使用自定义辅助函数。

function d3fn (url, success_handler, error_handler) {
  success_handler ({
    letter : 'letter',
    frequency : 9
  });
}

var d3 = {tsv : d3fn};

function fromD3Callback (d3fn, ctx) {
return function () {
  var args = Array.prototype.slice.call(arguments);
  var subject = new Rx.AsyncSubject();

  function success_handler () {
    subject.onNext.apply(subject, Array.prototype.slice.call(arguments));
    subject.onCompleted();
  }

  function error_handler () {
    subject.onError(Array.prototype.slice.call(arguments));
  }

  args.push(success_handler);
  args.push(error_handler);

  d3fn.apply(ctx, args);
  return subject.asObservable();
}
}

var test = fromD3Callback(d3.tsv)("https://gist.githubusercontent.com/mbostock/3885304/raw/37bd91278846c053188a130a01770cddff023590/data.tsv")
  .map(function(d) {
          return {
            letter: d.letter,
            frequency: +d.frequency
          };
        })
  .catch(function(error, rows) {
          console.log('mytest2',rows);
          return Rx.Observable.throw({error: error, rows: rows});
        });

var observer = Rx.Observer.create(
  function (x) { console.log('onNext: %o', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); });

var subscription = test.subscribe(observer);

我找到了我的问题的答案,但我想要一个更好的答案。我想出的是:

var url = "https://gist.githubusercontent.com/mbostock/3885304/raw/37bd91278846c053188a130a01770cddff023590/data.tsv"

var fetch = Rx.Observable.fromNodeCallback(d3.tsv);

var source = fetch(url, function(d) {
  return {
    letter: d.letter,
    frequency: +d.frequency
  };
})

var observer = Rx.Observer.create(
  function (o) {
        console.log('Next: success!', o);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

var subscription = source.subscribe(observer);

我唯一的问题是我不知道如何对此进行过滤,所以我愿意接受更好的解决方案。