为什么 MongoDB observeOn 不使用指定的执行上下文?
Why does MongoDB observeOn not use the specified execution context?
在 Scala 中,我编写了两个 MongoDB 可观察对象,并在传递自定义执行上下文时调用了 observeOn。在第一个可观察对象上调用 observeOn,但自定义执行上下文不会传播到第二个可观察对象。
为了帮助说明这一点,我编写了以下自包含代码:
import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import org.apache.commons.lang3.concurrent.BasicThreadFactory.Builder
import org.mongodb.scala.bson.collection.immutable.Document
import org.mongodb.scala.{MongoClient, Observable}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
object Test extends App {
val client = MongoClient("mongodb://localhost")
def insertObs = {
client.getDatabase("test").getCollection("test").insertOne(Document("test" -> 1))
}
val threadPool = new ThreadPoolExecutor(2, 2, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable],
new Builder().namingPattern("Custom pool").build())
val executionContext = ExecutionContext fromExecutor (threadPool)
val obs = Observable(List(1, 2, 3))
val res =
obs.observeOn(executionContext).map {
i =>
println("OBS " + Thread.currentThread().getName)
i
}.flatMap(_ => insertObs.map {
i =>
println("INSERT " + Thread.currentThread().getName)
i
})
Await.result(res.toFuture(), Duration(20, TimeUnit.SECONDS))
}
输出结果如下:
OBS Custom pool
INSERT Thread-2
OBS Custom pool
INSERT Thread-2
OBS Custom pool
INSERT Thread-4
我只希望 "Custom pool" 用作执行上下文,而不是在执行 "INSERT" 可观察对象时使用 Thread-2 和 Thread-4。如此处文档中所述:
具体来说,它说:为未来的操作使用特定的执行上下文
为什么自定义线程池不用于 "insert" observables?
这似乎按预期工作:
看到这张票:https://jira.mongodb.org/browse/SCALA-437
在 Scala 中,我编写了两个 MongoDB 可观察对象,并在传递自定义执行上下文时调用了 observeOn。在第一个可观察对象上调用 observeOn,但自定义执行上下文不会传播到第二个可观察对象。
为了帮助说明这一点,我编写了以下自包含代码:
import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import org.apache.commons.lang3.concurrent.BasicThreadFactory.Builder
import org.mongodb.scala.bson.collection.immutable.Document
import org.mongodb.scala.{MongoClient, Observable}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
object Test extends App {
val client = MongoClient("mongodb://localhost")
def insertObs = {
client.getDatabase("test").getCollection("test").insertOne(Document("test" -> 1))
}
val threadPool = new ThreadPoolExecutor(2, 2, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable],
new Builder().namingPattern("Custom pool").build())
val executionContext = ExecutionContext fromExecutor (threadPool)
val obs = Observable(List(1, 2, 3))
val res =
obs.observeOn(executionContext).map {
i =>
println("OBS " + Thread.currentThread().getName)
i
}.flatMap(_ => insertObs.map {
i =>
println("INSERT " + Thread.currentThread().getName)
i
})
Await.result(res.toFuture(), Duration(20, TimeUnit.SECONDS))
}
输出结果如下:
OBS Custom pool
INSERT Thread-2
OBS Custom pool
INSERT Thread-2
OBS Custom pool
INSERT Thread-4
我只希望 "Custom pool" 用作执行上下文,而不是在执行 "INSERT" 可观察对象时使用 Thread-2 和 Thread-4。如此处文档中所述:
具体来说,它说:为未来的操作使用特定的执行上下文
为什么自定义线程池不用于 "insert" observables?
这似乎按预期工作: 看到这张票:https://jira.mongodb.org/browse/SCALA-437