Rx.js:使用 promise 和 take 异步检查项目序列,直到返回 true

Rx.js: check sequence of items asynchronously with promise and take until true is returned

我是 Rx 的新手。我有一组用户名,我正在针对数据库测试它们,直到我找到一个不存在的用户名(一个免费的)。数据库调用给了我一个承诺。

如何使用 rx(.js) 实现它?

到目前为止我实现的代码是:

用户名流

var usernamesStream = Rx.Observable.from(['user1','user2','user3'])

数据库查询方式

var checkUsernameIsFree = function (username) {
  return db.users.find({username:username}).toArray().then(function(users) {
    return users.length == 0
  })
}

我想我应该在承诺中使用 .fromPromise 创建一个流。我如何实现它并加入 2 个流,以便 subscribe 仅使用第一个免费用户名被调用?

您可以使用 concatMap 来保留请求的顺序,它也会隐式接受承诺:

var usernames = Rx.Observable.from(['user1','user2','user3']);

//Preserve the order of the requests
var subscription = usernames.concatMap(function(name) {
           return Rx.Observable.fromPromise(db.users.find({username:name}).toArray())
                               .filter(function(x) { return x.length == 0; })
                               .map(function() {return name; });
          })
         .first()
         .subscribe(
           function(availableUserName){
             /*Only gets called once*/
           },
           function(e){ 
             /*Will get raised if no names were found or
               there was a problem accessing the database*/
          });

有几件事会根据您的要求而改变:

  1. 如果没有结果不是错误,你应该使用.firstOrDefault() or .filter().take(1)代替。

  2. 在上面的解决方案中,concatMap 可能会启动对所有名称的数据库请求,并简单地保留响应的顺序。如果你想延迟 db 请求的执行直到之前完成你应该用 startAsync:

  3. 包装 db.users.find

return Rx.Observable.startAsync(function() { 
  return db.users.find({username : name}).toArray();
});

编辑 1

修复了之前代码中未将名称值传递给最终订阅者的错误。