scala observable unify observable with a sequence 无需中间数据结构更新
scala observable unify observable with a sequence without intermediate datastructure update
我有一个代码调用 couchbase 来获取一些行,如下所示:
val gotValues: Observable[JsonDocument] = Observable.from(rowKeys).flatMap(id =>
couchbaseBucket.async().get(id))
如果我有 1,2,3,4,5,6 作为输入行键,并且数据库中只存在第 1,2,3 行,那么可观察对象只会收到关于 1,2,3 的通知。
然而,我的要求是我 return 一个 1、2、3 为真(存在于数据库中)和 4、5、6 为假(意味着不存在于数据库中)的映射。我设法用 scala observable 做到了这一点,但是我正在使用中间地图数据结构 return 包含所有 ID 的总地图。下面是模拟我的问题的示例代码..
object Main extends App {
import rx.lang.scala.Observable
val idsToFetch = Seq(1,2,3,4,5,6)
println(isInDBOrNot()) // {1=true, 2=true, 3=true, 4=false, 5=false, 6=false}
private def isInDBOrNot(): ConcurrentHashMap[Int, Boolean] = {
val inAndNotInDB = new java.util.concurrent.ConcurrentHashMap[Int, Boolean]
// - How can I avoid the additional data structure?
// - In this case a map, so that the function will return
// a map with all numbers and for each if exist in DB?
// - I mean I want the function to return a map I don't
// want to populate that map inside the observer,
// it's like a mini side effect I would rather simply
// manipulate the stream.
Observable.from(idsToFetch)
.filterNot(x => x == 4 || x == 5 || x == 6) // Simulate fetch from DB, 4,5,6 do not exist in DB, so not returned.
.subscribe(
x => inAndNotInDB.put(x, true),
e => println(e),
() => idsToFetch.filterNot(inAndNotInDB.containsKey)
.foreach(inAndNotInDB.put(_, false)) // mark all non-found as false.
)
inAndNotInDB
}
}
无论如何要在没有中间映射的情况下做到这一点(不填充中间数据结构,而仅通过操纵流)? 看起来不干净!!。谢谢
执行此操作的一种方法如下:
(1) 将 id 序列转换为 Observable
并 map
它与
id => (id, false)
...所以你会得到一个 Observable[(Int, Boolean)]
类型的可观察对象(我们称这个新的可观察对象为 first
)。
(2) 从数据库中获取数据并且 map
每个获取的行来自:
(some_id, true)
... 在 Observable[(Int, Boolean)]
内部(我们称其为可观察的 last
)
(3) concat first
和 last
.
(4) toMap (3) 的结果。来自 first
的重复元素将在处理过程中被丢弃。 (这将是你的 resultObsrvable
)
(5) (可能)收集可观察对象(您的地图)的第一个也是唯一一个元素。你可能根本不想这样做,但如果你这样做,你应该真正理解此时阻止收集结果的含义。无论如何,这一步实际上取决于您的应用程序细节(threading\scheduling\io 是如何组织的)但是蛮力方法应该看起来像这样(请参阅 this demo 了解更多细节):
Await.result(resultObsrvable.toBlocking.toFuture, 2 seconds)
这个怎么样
Observable.from(idsToFetch)
.filterNot(x => x._1 == 4 || x._1 == 5 || x._1 == 6)
.foldLeft(idToFetch.map{_->false}.toMap){(m,id)=>m+(id->true)}
你的问题似乎是由你使用 flatMap
引起的,所以如果数据库中没有给定 id
的数据并且你得到一个空的 Observable
,flatMap
不会为这样的 id
产生任何输出。所以看起来你需要的是defaultIfEmpty which is translated to Scala's orElse
。您可以使用 orElse
到 return flatMap
中的一些默认值。所以修改你的例子:
def fetchFromDb(id: Int): Observable[String] = {
if (id <= 3)
Observable.just(s"Document #$id")
else
Observable.empty
}
def gotValue(idsToFetch: Seq[Int]): Observable[(Int, Boolean)] = {
Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(_ => (id, true)).orElse((id, false)))
}
println(gotValue(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)
打印
List((1,true), (2,true), (3,true), (4,false), (5,false), (6,false))
或者你可以用Option
到returnSome(JsonDocument)
或者None
比如
def gotValueEx(idsToFetch: Seq[Int]): Observable[(Int, Option[String])] = {
Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(doc => (id, Option(doc))).orElse((id, None)))
}
println(gotValueEx(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)
打印
List((1,Some(Document #1)), (2,Some(Document #2)), (3,Some(Document #3)), (4,None), (5,None), (6,None))
我有一个代码调用 couchbase 来获取一些行,如下所示:
val gotValues: Observable[JsonDocument] = Observable.from(rowKeys).flatMap(id =>
couchbaseBucket.async().get(id))
如果我有 1,2,3,4,5,6 作为输入行键,并且数据库中只存在第 1,2,3 行,那么可观察对象只会收到关于 1,2,3 的通知。
然而,我的要求是我 return 一个 1、2、3 为真(存在于数据库中)和 4、5、6 为假(意味着不存在于数据库中)的映射。我设法用 scala observable 做到了这一点,但是我正在使用中间地图数据结构 return 包含所有 ID 的总地图。下面是模拟我的问题的示例代码..
object Main extends App {
import rx.lang.scala.Observable
val idsToFetch = Seq(1,2,3,4,5,6)
println(isInDBOrNot()) // {1=true, 2=true, 3=true, 4=false, 5=false, 6=false}
private def isInDBOrNot(): ConcurrentHashMap[Int, Boolean] = {
val inAndNotInDB = new java.util.concurrent.ConcurrentHashMap[Int, Boolean]
// - How can I avoid the additional data structure?
// - In this case a map, so that the function will return
// a map with all numbers and for each if exist in DB?
// - I mean I want the function to return a map I don't
// want to populate that map inside the observer,
// it's like a mini side effect I would rather simply
// manipulate the stream.
Observable.from(idsToFetch)
.filterNot(x => x == 4 || x == 5 || x == 6) // Simulate fetch from DB, 4,5,6 do not exist in DB, so not returned.
.subscribe(
x => inAndNotInDB.put(x, true),
e => println(e),
() => idsToFetch.filterNot(inAndNotInDB.containsKey)
.foreach(inAndNotInDB.put(_, false)) // mark all non-found as false.
)
inAndNotInDB
}
}
无论如何要在没有中间映射的情况下做到这一点(不填充中间数据结构,而仅通过操纵流)? 看起来不干净!!。谢谢
执行此操作的一种方法如下:
(1) 将 id 序列转换为 Observable
并 map
它与
id => (id, false)
...所以你会得到一个 Observable[(Int, Boolean)]
类型的可观察对象(我们称这个新的可观察对象为 first
)。
(2) 从数据库中获取数据并且 map
每个获取的行来自:
(some_id, true)
... 在 Observable[(Int, Boolean)]
内部(我们称其为可观察的 last
)
(3) concat first
和 last
.
(4) toMap (3) 的结果。来自 first
的重复元素将在处理过程中被丢弃。 (这将是你的 resultObsrvable
)
(5) (可能)收集可观察对象(您的地图)的第一个也是唯一一个元素。你可能根本不想这样做,但如果你这样做,你应该真正理解此时阻止收集结果的含义。无论如何,这一步实际上取决于您的应用程序细节(threading\scheduling\io 是如何组织的)但是蛮力方法应该看起来像这样(请参阅 this demo 了解更多细节):
Await.result(resultObsrvable.toBlocking.toFuture, 2 seconds)
这个怎么样
Observable.from(idsToFetch)
.filterNot(x => x._1 == 4 || x._1 == 5 || x._1 == 6)
.foldLeft(idToFetch.map{_->false}.toMap){(m,id)=>m+(id->true)}
你的问题似乎是由你使用 flatMap
引起的,所以如果数据库中没有给定 id
的数据并且你得到一个空的 Observable
,flatMap
不会为这样的 id
产生任何输出。所以看起来你需要的是defaultIfEmpty which is translated to Scala's orElse
。您可以使用 orElse
到 return flatMap
中的一些默认值。所以修改你的例子:
def fetchFromDb(id: Int): Observable[String] = {
if (id <= 3)
Observable.just(s"Document #$id")
else
Observable.empty
}
def gotValue(idsToFetch: Seq[Int]): Observable[(Int, Boolean)] = {
Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(_ => (id, true)).orElse((id, false)))
}
println(gotValue(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)
打印
List((1,true), (2,true), (3,true), (4,false), (5,false), (6,false))
或者你可以用Option
到returnSome(JsonDocument)
或者None
比如
def gotValueEx(idsToFetch: Seq[Int]): Observable[(Int, Option[String])] = {
Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(doc => (id, Option(doc))).orElse((id, None)))
}
println(gotValueEx(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)
打印
List((1,Some(Document #1)), (2,Some(Document #2)), (3,Some(Document #3)), (4,None), (5,None), (6,None))