使用 rx.js,我如何从计时器上的现有可观察序列发出记忆化结果?
Using rx.js, how do I emit a memoized result from an existing observable sequence on a timer?
我目前正在自学使用 rxjs 进行反应式编程,并且我给自己设定了一个挑战,即创建一个可观察流,无论如何都会向订阅者发出相同的结果。
我已经记住了给定特定 URL 的 HTTP "GET" 流的创建,并且我尝试每两秒对该流进行一次操作,结果是每次计时器滴答声,我将从原始流中提取 cached/memoized HTTP 结果。
import superagent from 'superagent';
import _ from 'lodash';
// Cached GET function, returning a stream that emits the HTTP response object
var httpget = _.memoize(function(url) {
var req = superagent.get(url);
req = req.end.bind(req);
return Rx.Observable.fromNodeCallback(req)();
});
// Assume this is created externally and I only have access to response$
var response$ = httpget('/ontologies/acl.ttl');
// Every two seconds, emit the memoized HTTP response
Rx.Observable.timer(0, 2000)
.map(() => response$)
.flatMap($ => $)
.subscribe(response => {
console.log('Got the response!');
});
我确信我必须在某个地方坚持对 replay()
的调用,但无论我做什么,每两秒都会启动一个新的 HTTP 调用。我如何构建它以便我可以从 URL 构造一个可观察对象并让它始终向任何后续订阅者发出相同的 HTTP 结果?
编辑
我找到了一种方法来获得我想要的结果,但我觉得我遗漏了一些东西,并且应该能够用更精简的方法重构它:
var httpget = _.memoize(function(url) {
var subject = new Rx.ReplaySubject();
try {
superagent.get(url).end((err, res) => {
if(err) {
subject.onError(err);
}
else {
subject.onNext(res);
subject.onCompleted();
}
});
}
catch(e) {
subject.onError(e);
}
return subject.asObservable();
});
您的第一个代码示例实际上更接近实现它的方式
var httpget = _.memoize(function(url) {
var req = superagent.get(url);
return Rx.Observable.fromNodeCallback(req.end, req)();
});
但是,这不起作用,因为 fromNodeCallback
中似乎存在错误。至于解决这个问题,我认为您实际上是在寻找 AsyncSubject
而不是 ReplaySubject
。后者有效,但前者正是为这种情况而设计的(如果这对您很重要,则没有创建数组的开销 + 运行时检查缓存过期)。
var httpget = _.memoize(function(url) {
var subject = new Rx.AsyncSubject();
var req = superagent.get(url);
Rx.Observable.fromNodeCallback(req.end, req)().subscribe(subject);
return subject.asObservable();
});
最后,虽然 map
很欣赏您的想法,但您可以通过使用直接采用 Observable
的 flatMap
重载来简化 timer
代码:
Rx.Observable.timer(0, 2000)
.flatMap($response)
.subscribe(response => {
console.log('Got the response');
});
除非我没听错你的问题,否则 Observable.combineLatest
会为你做到这一点,它会缓存你的可观察对象的最后一个发射值。
此代码发送一次请求,然后每 200 毫秒给出相同的缓存响应:
import reqPromise from 'request-promise';
import {Observable} from 'rx';
let httpGet_ = (url) =>
Observable
.combineLatest(
Observable.interval(200),
reqPromise(url),
(count, response) => response
);
httpGet_('http://google.com/')
.subscribe(
x => console.log(x),
e => console.error(e)
);
我目前正在自学使用 rxjs 进行反应式编程,并且我给自己设定了一个挑战,即创建一个可观察流,无论如何都会向订阅者发出相同的结果。
我已经记住了给定特定 URL 的 HTTP "GET" 流的创建,并且我尝试每两秒对该流进行一次操作,结果是每次计时器滴答声,我将从原始流中提取 cached/memoized HTTP 结果。
import superagent from 'superagent';
import _ from 'lodash';
// Cached GET function, returning a stream that emits the HTTP response object
var httpget = _.memoize(function(url) {
var req = superagent.get(url);
req = req.end.bind(req);
return Rx.Observable.fromNodeCallback(req)();
});
// Assume this is created externally and I only have access to response$
var response$ = httpget('/ontologies/acl.ttl');
// Every two seconds, emit the memoized HTTP response
Rx.Observable.timer(0, 2000)
.map(() => response$)
.flatMap($ => $)
.subscribe(response => {
console.log('Got the response!');
});
我确信我必须在某个地方坚持对 replay()
的调用,但无论我做什么,每两秒都会启动一个新的 HTTP 调用。我如何构建它以便我可以从 URL 构造一个可观察对象并让它始终向任何后续订阅者发出相同的 HTTP 结果?
编辑
我找到了一种方法来获得我想要的结果,但我觉得我遗漏了一些东西,并且应该能够用更精简的方法重构它:
var httpget = _.memoize(function(url) {
var subject = new Rx.ReplaySubject();
try {
superagent.get(url).end((err, res) => {
if(err) {
subject.onError(err);
}
else {
subject.onNext(res);
subject.onCompleted();
}
});
}
catch(e) {
subject.onError(e);
}
return subject.asObservable();
});
您的第一个代码示例实际上更接近实现它的方式
var httpget = _.memoize(function(url) {
var req = superagent.get(url);
return Rx.Observable.fromNodeCallback(req.end, req)();
});
但是,这不起作用,因为 fromNodeCallback
中似乎存在错误。至于解决这个问题,我认为您实际上是在寻找 AsyncSubject
而不是 ReplaySubject
。后者有效,但前者正是为这种情况而设计的(如果这对您很重要,则没有创建数组的开销 + 运行时检查缓存过期)。
var httpget = _.memoize(function(url) {
var subject = new Rx.AsyncSubject();
var req = superagent.get(url);
Rx.Observable.fromNodeCallback(req.end, req)().subscribe(subject);
return subject.asObservable();
});
最后,虽然 map
很欣赏您的想法,但您可以通过使用直接采用 Observable
的 flatMap
重载来简化 timer
代码:
Rx.Observable.timer(0, 2000)
.flatMap($response)
.subscribe(response => {
console.log('Got the response');
});
除非我没听错你的问题,否则 Observable.combineLatest
会为你做到这一点,它会缓存你的可观察对象的最后一个发射值。
此代码发送一次请求,然后每 200 毫秒给出相同的缓存响应:
import reqPromise from 'request-promise';
import {Observable} from 'rx';
let httpGet_ = (url) =>
Observable
.combineLatest(
Observable.interval(200),
reqPromise(url),
(count, response) => response
);
httpGet_('http://google.com/')
.subscribe(
x => console.log(x),
e => console.error(e)
);