阿卡 HTTP。来自回调的流媒体源

Akka HTTP. Streaming source from callback

我正在尝试连接 Akka HTTP 和一些旧的 Java 库。该库有两种方法——一种接受回调函数来接收字符串,另一种表示数据流结束。接收数据的回调函数可以多次调用。考虑这个片段:

   oldJavaLib.receiveData((s:String) => {
       println("received:" + s)
   })

   oldJavaLib.dataEnd(() => {
       println("data transmission is over")
   })

我想使用 Akka HTTP 流式传输数据,因为它被回调函数接收。但我不确定什么是最好的方法。
我想创建一个流,然后像这样在 HTTP 路由中直接使用它:

  def fetchUsers(): Source[User, NotUsed] = Source.fromIterator(() => Iterator.fill(1000000) {
    val id = Random.nextInt()
    dummyUser(id.toString)
  })
  
  lazy val routes: Route =
      pathPrefix("test") {
        concat(
          pathEnd {
            concat(
              get {
                complete(fetchUsers())
              }
            )
          }
        )
      }  

fetchUsers() 函数应该 return 一个从遗留 java API 获取数据的流。也许有更好的方法。

我假设您想创建一个 Akka 流来从回调中发出值?您可以使用 Source.queue。对于第一个回调,它将是:

val queue = Source.queue[String](bufferSize = 1000)
                .toMat(Sink.ignore)(Keep.left)
                .run()
oldJavaLib.receiveData((s: String) => {
   queue.offer(s) match {
       case Enqueued => println("received:" + s)
       case _ => println("failed to enqueue:" + s)
   }
})

问题澄清后编辑

如果您想在 HTTP 路由中使用源,您必须 prematerialize it。参考我以前的代码,它看起来像这样:

val (queue, source) = Source.queue[String](bufferSize = 1000).preMaterialize()

source然后可以在任何路由中使用。