Alpakka MongoDB - 在 MongoSource 中指定类型
Alpakka MongoDB - specify type in MongoSource
我目前正在使用 Akka Streams 和 Alpakka MongoDB connector。
是否可以指定 MongoSource
的类型?
val codecRegistry = fromRegistries(fromProviders(classOf[TodoMongo]), DEFAULT_CODEC_REGISTRY)
private val todoCollection: MongoCollection[TodoMongo] = mongoDb
.withCodecRegistry(codecRegistry)
.getCollection("todo")
我想做这样的事情:
val t: FindObservable[Seq[TodoMongo]] = todoCollection.find()
MongoSource(t) // Stuck here
但是我得到以下错误:
Expected Observable[scala.Document], Actual FindObservable[Seq[TodoMongo]].
我找不到关于这部分的正确文档。
这还没有发布,但是在Alpakka的master分支中,MongoSource.apply
接受了一个类型参数:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
因此,随着即将发布的 Alpakka 0.18 版本,您将能够执行以下操作:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
注意这里的source
假设todoCollection.find()
returns和Observable[TodoMongo]
;根据需要调整类型。
同时,您可以简单地手动添加上述代码。例如:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
请注意 MyMongoSource
被定义为驻留在 akka.stream.alpakka.mongodb.scaladsl
包中(如 MongoSource
),因为 ObservableToPublisher
是 package-private class.您将以与使用 MongoSource
:
相同的方式使用 MyMongoSource
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())
我目前正在使用 Akka Streams 和 Alpakka MongoDB connector。
是否可以指定 MongoSource
的类型?
val codecRegistry = fromRegistries(fromProviders(classOf[TodoMongo]), DEFAULT_CODEC_REGISTRY)
private val todoCollection: MongoCollection[TodoMongo] = mongoDb
.withCodecRegistry(codecRegistry)
.getCollection("todo")
我想做这样的事情:
val t: FindObservable[Seq[TodoMongo]] = todoCollection.find()
MongoSource(t) // Stuck here
但是我得到以下错误:
Expected Observable[scala.Document], Actual FindObservable[Seq[TodoMongo]].
我找不到关于这部分的正确文档。
这还没有发布,但是在Alpakka的master分支中,MongoSource.apply
接受了一个类型参数:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
因此,随着即将发布的 Alpakka 0.18 版本,您将能够执行以下操作:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
注意这里的source
假设todoCollection.find()
returns和Observable[TodoMongo]
;根据需要调整类型。
同时,您可以简单地手动添加上述代码。例如:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
请注意 MyMongoSource
被定义为驻留在 akka.stream.alpakka.mongodb.scaladsl
包中(如 MongoSource
),因为 ObservableToPublisher
是 package-private class.您将以与使用 MongoSource
:
MyMongoSource
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())