阿卡 HTTP。来自回调的流媒体源
Akka HTTP. Streaming source from callback
我正在尝试连接 Akka HTTP 和一些旧的 Java 库。该库有两种方法——一种接受回调函数来接收字符串,另一种表示数据流结束。接收数据的回调函数可以多次调用。考虑这个片段:
oldJavaLib.receiveData((s:String) => {
println("received:" + s)
})
oldJavaLib.dataEnd(() => {
println("data transmission is over")
})
我想使用 Akka HTTP 流式传输数据,因为它被回调函数接收。但我不确定什么是最好的方法。
我想创建一个流,然后像这样在 HTTP 路由中直接使用它:
def fetchUsers(): Source[User, NotUsed] = Source.fromIterator(() => Iterator.fill(1000000) {
val id = Random.nextInt()
dummyUser(id.toString)
})
lazy val routes: Route =
pathPrefix("test") {
concat(
pathEnd {
concat(
get {
complete(fetchUsers())
}
)
}
)
}
fetchUsers()
函数应该 return 一个从遗留 java API 获取数据的流。也许有更好的方法。
我假设您想创建一个 Akka 流来从回调中发出值?您可以使用 Source.queue。对于第一个回调,它将是:
val queue = Source.queue[String](bufferSize = 1000)
.toMat(Sink.ignore)(Keep.left)
.run()
oldJavaLib.receiveData((s: String) => {
queue.offer(s) match {
case Enqueued => println("received:" + s)
case _ => println("failed to enqueue:" + s)
}
})
问题澄清后编辑
如果您想在 HTTP 路由中使用源,您必须 prematerialize it。参考我以前的代码,它看起来像这样:
val (queue, source) = Source.queue[String](bufferSize = 1000).preMaterialize()
source
然后可以在任何路由中使用。
我正在尝试连接 Akka HTTP 和一些旧的 Java 库。该库有两种方法——一种接受回调函数来接收字符串,另一种表示数据流结束。接收数据的回调函数可以多次调用。考虑这个片段:
oldJavaLib.receiveData((s:String) => {
println("received:" + s)
})
oldJavaLib.dataEnd(() => {
println("data transmission is over")
})
我想使用 Akka HTTP 流式传输数据,因为它被回调函数接收。但我不确定什么是最好的方法。
我想创建一个流,然后像这样在 HTTP 路由中直接使用它:
def fetchUsers(): Source[User, NotUsed] = Source.fromIterator(() => Iterator.fill(1000000) {
val id = Random.nextInt()
dummyUser(id.toString)
})
lazy val routes: Route =
pathPrefix("test") {
concat(
pathEnd {
concat(
get {
complete(fetchUsers())
}
)
}
)
}
fetchUsers()
函数应该 return 一个从遗留 java API 获取数据的流。也许有更好的方法。
我假设您想创建一个 Akka 流来从回调中发出值?您可以使用 Source.queue。对于第一个回调,它将是:
val queue = Source.queue[String](bufferSize = 1000)
.toMat(Sink.ignore)(Keep.left)
.run()
oldJavaLib.receiveData((s: String) => {
queue.offer(s) match {
case Enqueued => println("received:" + s)
case _ => println("failed to enqueue:" + s)
}
})
问题澄清后编辑
如果您想在 HTTP 路由中使用源,您必须 prematerialize it。参考我以前的代码,它看起来像这样:
val (queue, source) = Source.queue[String](bufferSize = 1000).preMaterialize()
source
然后可以在任何路由中使用。