我怎样才能将一个可观察对象变成一个由长轮询可观察对象组成的可观察对象,这些可观察对象在特定值上完成?

How can I turn an observable into an observable of long polling observables which complete on a specific value?

我正在使用 RxJs 创建一个交互式网页。

这就是我想要实现的:

我有一个生成令牌的应用程序。这些令牌可以被外部实体使用。

当用户创建令牌时,页面开始轮询网络服务器的状态(已使用或未使用)。消耗令牌后,页面刷新。

因此,创建令牌后,每2秒向服务器发送一个请求,询问令牌是否已被使用。

我有 Observable 个字符串代表我的 generatedTokens

我实际上已经有了一个使用 Rx.Scheduler.default class 的有效实现,它允许我手动执行操作。但是,我不禁觉得应该有一个更简单、更优雅的解决方案。

这是当前代码:

class TokenStore {
  constructor(tokenService, scheduler) {
    // actual implementation omitted for clarity
    this.generatedTokens = Rx.Observable.just(["token1", "token2"]);

    this.consumedTokens = this.generatedTokens
      .flatMap(token => 
        Rx.Observable.create(function(observer) {
          var notify = function() {
            observer.onNext(token);
            observer.onCompleted();
          };
          var poll = function() {
            scheduler.scheduleWithRelative(2000, function() {
                // tokenService.isTokenConsumed returns a promise that resolves with a boolean
                tokenService.isTokenConsumed(token)
                  .then(isConsumed => isConsumed ? notify() : poll());
              }
            );
          };
          poll();
        }));
  }
}

是否有类似 "repeatUntil" 的方法?我正在寻找与上面的代码做同样事情的实现,但看起来更像这样:

class TokenStore {
  constructor(tokenService, scheduler) {
    // actual implementation omitted for clarity
    this.generatedTokens = Rx.Observable.just(["token1", "token2"]);

    this.consumedTokens = this.generatedTokens
      .flatMap(token =>
        Rx.Observable.fromPromise(tokenService.isTokenConsumed(token))
                     .delay(2000, scheduler)
                      // is this possible?
                     .repeatUntil(isConsumed => isConsumed === true));
  }
} 

有趣的是,在发布问题几分钟后,答案让我印象深刻。我想 rubberducking 可能不会那么傻。

总之,答案由两部分组成:

  • repeatUntil 可以通过 repeat()filter()first()

  • 的组合来实现
  • fromPromise 有一些内部惰性缓存机制,导致后续订阅不会触发新的 AJAX 请求。因此我不得不重新使用 Rx.Observable.create

解决方法:

class TokenStore {
  constructor(tokenService, scheduler) {
    // actual implementation omitted for clarity
    this.generatedTokens = Rx.Observable.just(["token1", "token2"]);

    this.consumedTokens = this.generatedTokens
      .flatMap(token =>
        // must use defer otherwise it doesnt retrigger call upon subscription
        Rx.Observable
        .defer(() => tokenService.isTokenConsumed(token))
        .delay(2000, scheduler)
        .repeat()
        .filter(isConsumed => isConsumed === true)
        .first())
    .share();
  }
} 

一个小旁注:"share()" 确保两个可观察对象都是热的,这避免了每个订阅者都会导致 ajax 请求开始触发的情况。

class TokenSource {
  constructor(tokenService, scheduler) {
    this.generatedTokens = Rx.Observable.just(["token1", "token2"]).share();

    this.consumedTokens = this.generatedTokens
      .flatMap(token => 
         Rx.Observable.interval(2000, scheduler)
               .flatMap(Rx.Observable.defer(() => 
                          tokenService.isTokenConsumed(token)))
               .first(isConsumed => isConsumed === true))
      .share()

  }
}

您可以利用两个事实:

  1. flatMap 有一个重载,它接受一个 observable,每次新事件进入时都会重新订阅

  2. defer 可以采用返回承诺的方法。该方法将在每次订阅时重新执行,这意味着您不必自己滚动 Promise->Observable 转换。