使用 rx.js,我如何从计时器上的现有可观察序列发出记忆化结果?

Using rx.js, how do I emit a memoized result from an existing observable sequence on a timer?

我目前正在自学使用 rxjs 进行反应式编程,并且我给自己设定了一个挑战,即创建一个可观察流,无论如何都会向订阅者发出相同的结果。

我已经记住了给定特定 URL 的 HTTP "GET" 流的创建,并且我尝试每两秒对该流进行一次操作,结果是每次计时器滴答声,我将从原始流中提取 cached/memoized HTTP 结果。

import superagent from 'superagent';
import _ from 'lodash';

// Cached GET function, returning a stream that emits the HTTP response object
var httpget = _.memoize(function(url) {
  var req = superagent.get(url);
  req = req.end.bind(req);
  return Rx.Observable.fromNodeCallback(req)();
});

// Assume this is created externally and I only have access to response$
var response$ = httpget('/ontologies/acl.ttl');

// Every two seconds, emit the memoized HTTP response
Rx.Observable.timer(0, 2000)
  .map(() => response$)
  .flatMap($ => $)
  .subscribe(response => {
    console.log('Got the response!');
  });

我确信我必须在某个地方坚持对 replay() 的调用,但无论我做什么,每两秒都会启动一个新的 HTTP 调用。我如何构建它以便我可以从 URL 构造一个可观察对象并让它始终向任何后续订阅者发出相同的 HTTP 结果?

编辑
我找到了一种方法来获得我想要的结果,但我觉得我遗漏了一些东西,并且应该能够用更精简的方法重构它:

var httpget = _.memoize(function(url) {
  var subject = new Rx.ReplaySubject();
  try {
    superagent.get(url).end((err, res) => {
      if(err) {
        subject.onError(err);
      }
      else {
        subject.onNext(res);
        subject.onCompleted();
      }
    });
  }
  catch(e) {
    subject.onError(e);
  }
  return subject.asObservable();
});

您的第一个代码示例实际上更接近实现它的方式

var httpget = _.memoize(function(url) {
  var req = superagent.get(url);
  return Rx.Observable.fromNodeCallback(req.end, req)();
});

但是,这不起作用,因为 fromNodeCallback 中似乎存在错误。至于解决这个问题,我认为您实际上是在寻找 AsyncSubject 而不是 ReplaySubject。后者有效,但前者正是为这种情况而设计的(如果这对您很重要,则没有创建数组的开销 + 运行时检查缓存过期)。

var httpget = _.memoize(function(url) {

  var subject = new Rx.AsyncSubject();
  var req = superagent.get(url);
  Rx.Observable.fromNodeCallback(req.end, req)().subscribe(subject);
  return subject.asObservable();

});

最后,虽然 map 很欣赏您的想法,但您可以通过使用直接采用 ObservableflatMap 重载来简化 timer 代码:

Rx.Observable.timer(0, 2000)
  .flatMap($response)
  .subscribe(response => {
    console.log('Got the response');
  });

除非我没听错你的问题,否则 Observable.combineLatest 会为你做到这一点,它会缓存你的可观察对象的最后一个发射值。

此代码发送一次请求,然后每 200 毫秒给出相同的缓存响应:

import reqPromise from 'request-promise';
import {Observable} from 'rx';

let httpGet_ = (url) =>
      Observable
      .combineLatest(
        Observable.interval(200),
        reqPromise(url),
        (count, response) => response
      );

httpGet_('http://google.com/')
  .subscribe(
    x => console.log(x),
    e => console.error(e)
  );