仅调用 onComplete 时映射 scala rx observables
mapping over scala rx observables when only onComplete is called
我正在使用 scala observables 从 couchbase 获取项目,然后我正在使用 map、flatMap、zip 来转换结果。问题是,如果 couchbase 中不存在某个项目,那么例如 .zip
不会仅调用 onComplete。示例:
import rx.lang.scala._
def getIdsWithValues(ids: Seq[String]): Map[K, V] = {
val values = Observable.from(keyValueIds).flatMap(id => couchbaseBucket.async().get(id))
values.zip(Observable.from(ids)) // zip is not called if no row in couchbase with id.
...
}
所以我想要:
- Return k -> v
的映射
- 我让 .zip 将 k 耦合到返回的 v(如果不存在,我希望 v 类似于
None
。
- 我看到如果 db 中不存在任何项目,则根本不会调用 zip。
我想到在 运行 上面的代码之后,扫描 ids
输入参数,并为每个没有压缩值的参数添加一个 id 到它的值,但这就像添加另一个流,我希望 zip 能够处理现有和不存在的行。
我该如何处理?我怎样才能让 .zip
处理现有的和不存在的行?
不要使用 zip()
运算符。相反,只需使用 flatMap()
和 materialize().take(1)
。 materialize()
会将 onComplete()
事件转换为可以映射到 None
的 Notification
,而具有值的 Notification
将映射到 Some(value)
.
def getIdsWithValues(ids: Seq[String]): Map[K, V] = {
val values = Observable.from(keyValueIds)
.flatMap(id => couchbaseBucket.async()
.get(id)
.materialize()
.take(1)
.map( res => if ( res.isOnComplete() )
(id, None)
else
(id, Some(res.getValue))
...
}
我正在使用 scala observables 从 couchbase 获取项目,然后我正在使用 map、flatMap、zip 来转换结果。问题是,如果 couchbase 中不存在某个项目,那么例如 .zip
不会仅调用 onComplete。示例:
import rx.lang.scala._
def getIdsWithValues(ids: Seq[String]): Map[K, V] = {
val values = Observable.from(keyValueIds).flatMap(id => couchbaseBucket.async().get(id))
values.zip(Observable.from(ids)) // zip is not called if no row in couchbase with id.
...
}
所以我想要:
- Return k -> v 的映射
- 我让 .zip 将 k 耦合到返回的 v(如果不存在,我希望 v 类似于
None
。 - 我看到如果 db 中不存在任何项目,则根本不会调用 zip。
我想到在 运行 上面的代码之后,扫描 ids
输入参数,并为每个没有压缩值的参数添加一个 id 到它的值,但这就像添加另一个流,我希望 zip 能够处理现有和不存在的行。
我该如何处理?我怎样才能让 .zip
处理现有的和不存在的行?
不要使用 zip()
运算符。相反,只需使用 flatMap()
和 materialize().take(1)
。 materialize()
会将 onComplete()
事件转换为可以映射到 None
的 Notification
,而具有值的 Notification
将映射到 Some(value)
.
def getIdsWithValues(ids: Seq[String]): Map[K, V] = {
val values = Observable.from(keyValueIds)
.flatMap(id => couchbaseBucket.async()
.get(id)
.materialize()
.take(1)
.map( res => if ( res.isOnComplete() )
(id, None)
else
(id, Some(res.getValue))
...
}