如何使轮询可观察到检测未实现的承诺

How to make polling observable detect unfullfilled promise

此可观察轮询 getPromise() 每秒运行一次。在 getPromise() 函数 returns 3 承诺它停止解析它们之后。我如何检测 getPromise() 函数没有 resolve/rejected 过去的任何承诺,比方说,2 秒,然后调用 onError 处理程序。我试过让它与 timeout 运算符一起工作但无济于事。有什么想法吗?

Rx.Observable.interval(1000)
  .switchMap(() => Rx.Observable.fromPromise(getPromise()))
  .subscribe(onValue, onError);

function onValue(value){
  console.log('value: ', value);
}
function onError(error){
  console.log('error: ', error);
}
var getPromise = (function(){
  var counter = 3;
  return function(){
    return new Promise(function(resolve, reject){
      if(counter > 0) resolve(1);
      counter--;
    })
  }
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.3.0/Rx.js"></script>

您可以使用仅订阅第一个发出的 Observable 的 race 运算符。

你说你想在 2 次不活动后调用 onError 处理程序。这与使用 switchMap 相矛盾,后者会在回调返回新的 Observable 时自动取消订阅。因此,您可能希望使用 exhaustMap instead. Also when you emit an error notification the chain unsubscribes and you'll never receive any other value. This means that you shouldn't emit the timeout as an error or use also the retry 运算符来自动重新订阅(但这实际上取决于您要实现的目标)。

这是您更新的示例,仅使用 race() 运算符。

Rx.Observable.interval(1000)
  .switchMap(() => 
    Rx.Observable.race(
      Rx.Observable.fromPromise(getPromise()),
      Rx.Observable.timer(0, 1000).mapTo(42)
    )
  )
  .subscribe(onValue, onError);

function onValue(value){
  console.log('value: ', value);
}
function onError(error){
  console.log('error: ', error);
}
var getPromise = (function(){
  var counter = 3;
  return function(){
    return new Promise(function(resolve, reject){
      if(counter > 0) resolve(1);
      counter--;
    })
  }
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.3.0/Rx.js"></script>

编辑:在 2 秒不活动后发送单个错误通知。

Rx.Observable.interval(1000)
  .switchMap(() => Rx.Observable.fromPromise(getPromise()))
  .timeout(2000)
  .subscribe(onValue, onError);

function onValue(value){
  console.log('value: ', value);
}
function onError(error){
  console.log('error: ', error);
}
var getPromise = (function(){
  var counter = 3;
  return function(){
    return new Promise(function(resolve, reject){
      if(counter > 0) resolve(1);
      counter--;
    })
  }
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.2.0/Rx.js"></script>

5.3.0 中确实存在一个错误,不直接在 timeout() 运算符中,而是在调度异步操作中。 https://github.com/ReactiveX/rxjs/pull/2580

没有 timeout() 运算符:

Rx.Observable.interval(1000)
  .switchMap(() =>
    Rx.Observable.race(
      Rx.Observable.fromPromise(getPromise()),
      Rx.Observable.timer(0, 2000).map(function(_) {
        throw new Error('timeout');
      })
    )
  )
  .subscribe(onValue, onError);