Rxjs 包装来自 D3 (Observable) 的其他库函数
Rxjs wrapping other libraries functions from D3 (Observable)
我只是不明白这个概念,正在寻求启示。我正在尝试观察何时获取数据但混淆了过程。这是我到目前为止所拥有的。 ajax 请求是通过 d3.tsv.
完成的
var test = Rx.Observable.just(
d3.tsv("https://gist.githubusercontent.com/mbostock/3885304/raw/37bd91278846c053188a130a01770cddff023590/data.tsv",
function(d) {
return {
letter: d.letter,
frequency: +d.frequency
};
},
function(error, rows) {
console.log('mytest2',rows);
}
)
);
var observer = Rx.Observer.create(
function (x) { console.log('onNext: %s', x); },
function (e) { console.log('onError: %s', e); },
function () { console.log('onCompleted'); });
var subscription = test.subscribe(observer);
虽然这个 ajax 请求在技术上是可行的,但所有 Observable 函数都发生在数据到达之前。我如何在我的 'onNext' 日志给我数据而不是仅在 d3.tsv 函数内部获取数据的情况下构造它?
有专门用于将回调转换为可观察对象的 RxJS 运算符(.fromCallback
、.fromNodeCallback
)。但是他们不会在这里工作,因为他们期望一个回调,而那个回调是最后一个参数。这里有两个回调,一个用于成功,一个用于结果。我不知道这种情况下有任何特殊运算符,所以我建议您使用自定义辅助函数。
function d3fn (url, success_handler, error_handler) {
success_handler ({
letter : 'letter',
frequency : 9
});
}
var d3 = {tsv : d3fn};
function fromD3Callback (d3fn, ctx) {
return function () {
var args = Array.prototype.slice.call(arguments);
var subject = new Rx.AsyncSubject();
function success_handler () {
subject.onNext.apply(subject, Array.prototype.slice.call(arguments));
subject.onCompleted();
}
function error_handler () {
subject.onError(Array.prototype.slice.call(arguments));
}
args.push(success_handler);
args.push(error_handler);
d3fn.apply(ctx, args);
return subject.asObservable();
}
}
var test = fromD3Callback(d3.tsv)("https://gist.githubusercontent.com/mbostock/3885304/raw/37bd91278846c053188a130a01770cddff023590/data.tsv")
.map(function(d) {
return {
letter: d.letter,
frequency: +d.frequency
};
})
.catch(function(error, rows) {
console.log('mytest2',rows);
return Rx.Observable.throw({error: error, rows: rows});
});
var observer = Rx.Observer.create(
function (x) { console.log('onNext: %o', x); },
function (e) { console.log('onError: %s', e); },
function () { console.log('onCompleted'); });
var subscription = test.subscribe(observer);
我找到了我的问题的答案,但我想要一个更好的答案。我想出的是:
var url = "https://gist.githubusercontent.com/mbostock/3885304/raw/37bd91278846c053188a130a01770cddff023590/data.tsv"
var fetch = Rx.Observable.fromNodeCallback(d3.tsv);
var source = fetch(url, function(d) {
return {
letter: d.letter,
frequency: +d.frequency
};
})
var observer = Rx.Observer.create(
function (o) {
console.log('Next: success!', o);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
var subscription = source.subscribe(observer);
我唯一的问题是我不知道如何对此进行过滤,所以我愿意接受更好的解决方案。
我只是不明白这个概念,正在寻求启示。我正在尝试观察何时获取数据但混淆了过程。这是我到目前为止所拥有的。 ajax 请求是通过 d3.tsv.
完成的var test = Rx.Observable.just(
d3.tsv("https://gist.githubusercontent.com/mbostock/3885304/raw/37bd91278846c053188a130a01770cddff023590/data.tsv",
function(d) {
return {
letter: d.letter,
frequency: +d.frequency
};
},
function(error, rows) {
console.log('mytest2',rows);
}
)
);
var observer = Rx.Observer.create(
function (x) { console.log('onNext: %s', x); },
function (e) { console.log('onError: %s', e); },
function () { console.log('onCompleted'); });
var subscription = test.subscribe(observer);
虽然这个 ajax 请求在技术上是可行的,但所有 Observable 函数都发生在数据到达之前。我如何在我的 'onNext' 日志给我数据而不是仅在 d3.tsv 函数内部获取数据的情况下构造它?
有专门用于将回调转换为可观察对象的 RxJS 运算符(.fromCallback
、.fromNodeCallback
)。但是他们不会在这里工作,因为他们期望一个回调,而那个回调是最后一个参数。这里有两个回调,一个用于成功,一个用于结果。我不知道这种情况下有任何特殊运算符,所以我建议您使用自定义辅助函数。
function d3fn (url, success_handler, error_handler) { success_handler ({ letter : 'letter', frequency : 9 }); } var d3 = {tsv : d3fn}; function fromD3Callback (d3fn, ctx) { return function () { var args = Array.prototype.slice.call(arguments); var subject = new Rx.AsyncSubject(); function success_handler () { subject.onNext.apply(subject, Array.prototype.slice.call(arguments)); subject.onCompleted(); } function error_handler () { subject.onError(Array.prototype.slice.call(arguments)); } args.push(success_handler); args.push(error_handler); d3fn.apply(ctx, args); return subject.asObservable(); } } var test = fromD3Callback(d3.tsv)("https://gist.githubusercontent.com/mbostock/3885304/raw/37bd91278846c053188a130a01770cddff023590/data.tsv") .map(function(d) { return { letter: d.letter, frequency: +d.frequency }; }) .catch(function(error, rows) { console.log('mytest2',rows); return Rx.Observable.throw({error: error, rows: rows}); }); var observer = Rx.Observer.create( function (x) { console.log('onNext: %o', x); }, function (e) { console.log('onError: %s', e); }, function () { console.log('onCompleted'); }); var subscription = test.subscribe(observer);
我找到了我的问题的答案,但我想要一个更好的答案。我想出的是:
var url = "https://gist.githubusercontent.com/mbostock/3885304/raw/37bd91278846c053188a130a01770cddff023590/data.tsv"
var fetch = Rx.Observable.fromNodeCallback(d3.tsv);
var source = fetch(url, function(d) {
return {
letter: d.letter,
frequency: +d.frequency
};
})
var observer = Rx.Observer.create(
function (o) {
console.log('Next: success!', o);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
var subscription = source.subscribe(observer);
我唯一的问题是我不知道如何对此进行过滤,所以我愿意接受更好的解决方案。