仅调用 onComplete 时映射 scala rx observables

mapping over scala rx observables when only onComplete is called

我正在使用 scala observables 从 couchbase 获取项目,然后我正在使用 map、flatMap、zip 来转换结果。问题是,如果 couchbase 中不存在某个项目,那么例如 .zip 不会仅调用 onComplete。示例:

import rx.lang.scala._

def getIdsWithValues(ids: Seq[String]): Map[K, V] = {
  val values = Observable.from(keyValueIds).flatMap(id => couchbaseBucket.async().get(id))
  values.zip(Observable.from(ids)) // zip is not called if no row in couchbase with id.
  ...
}

所以我想要:

  1. Return k -> v
  2. 的映射
  3. 我让 .zip 将 k 耦合到返回的 v(如果不存在,我希望 v 类似于 None
  4. 我看到如果 db 中不存在任何项目,则根本不会调用 zip。

我想到在 运行 上面的代码之后,扫描 ids 输入参数,并为每个没有压缩值的参数添加一个 id 到它的值,但这就像添加另一个流,我希望 zip 能够处理现有和不存在的行。

我该如何处理?我怎样才能让 .zip 处理现有的和不存在的行?

不要使用 zip() 运算符。相反,只需使用 flatMap()materialize().take(1)materialize() 会将 onComplete() 事件转换为可以映射到 NoneNotification,而具有值的 Notification 将映射到 Some(value).

def getIdsWithValues(ids: Seq[String]): Map[K, V] = {
  val values = Observable.from(keyValueIds)
   .flatMap(id => couchbaseBucket.async()
     .get(id)
     .materialize()
     .take(1)
     .map( res => if ( res.isOnComplete() )
                     (id, None)
                  else 
                     (id, Some(res.getValue))
  ...
}