将 TCP 服务器的输出捕获到 Akka Stream 队列的最推荐方法是什么?

What is the most recommended way to capture TCP Server's output to an Akka Stream queue?

我正在试验 Akka Streams,以期准确了解应该如何使用 TCP 服务器从客户端接收的内容(服务器不需要响应客户端)。

这是一个标准的 TCP 服务器实现(在应用我从@heiko-seeberger 的简洁解释 中理解的内容之后):

def runServer(system: ActorSystem, address: String, port: Int, collectingSink: Sink[ByteString,NotUsed]): Unit = {
    implicit val sys = system
    import system.dispatcher
    implicit val materializer = ActorMaterializer()

    val handler = Sink.foreach[IncomingConnection] { conn =>
      conn.handleWith(
        Flow[ByteString]
          .via(JsonFraming.objectScanner(maximumObjectLength = 400))
          .alsoTo(collectingSink)
          .map(b => ByteString.empty)
          .filter(_ == false)
      )
    }

    val connections = Tcp().bind(address, port)
    val binding = connections.to(handler).run()

    binding.onComplete {
      case Success(b) =>
        println("Server started, listening on: " + b.localAddress)
      case Failure(e) =>
        println(s"Server could not bind to $address:$port: ${e.getMessage}")
        system.terminate()
    }
  }

我作为 collectingSink 参数传递给 运行Server() 函数的值是这样构造的:

import akka.stream.scaladsl.{Flow, JsonFraming, Sink}
import akka.util.ByteString
import play.api.libs.json.Json

object DeviceDataProcessor {

      case class Readings (
                     radiationLevel: Double,
                     ambientTemp: Double,
                     photoSensor: Double,
                     humidity: Double,
                     sensorUUID: String,
                     timestampAttached: Long)

      val xformToDeviceReadings = Flow[ByteString]
        .via(JsonFraming.objectScanner(maximumObjectLength = 400))
        .map(b => {

            val jsonified = Json.parse(b.utf8String)
            val readings  =  Readings(
                           (jsonified \ "radiation_level")         .as[Double],
                           (jsonified \ "ambient_temperature")     .as[Double],
                           (jsonified \ "photosensor")             .as[Double],
                           (jsonified \ "humidity")                .as[Double],
                           (jsonified \ "sensor_uuid")             .as[String],
                           (jsonified \ "timestamp")               .as[Long]
            )
            readings
        })
        .to(Sink.queue())

    }

最后,这就是我 运行 我的 Driver:

object ConsumerDriver extends App {
       val actorSystem = ActorSystem("ServerSide")
       TCPServer.runServer(actorSystem,"127.0.0.1", 9899,DeviceDataProcessor.xformToDeviceReadings)
}

我没能理解这里两件事背后的原因:

1) xformToDeviceReadings 的类型派生为

Sink[ByteStream,NotUsed]

映射类型 Readings 不应该出现在这里吗?

2) 如何开始从此队列读取并将元素传递到另一个上游流?我是否必须先具体化然后使用具体化队列作为我的新 Source?

我已经阅读了 Akka 站点上的文档。但是,我很乐意被重定向到本文档的任何特定部分,或 SO 上的其他帖子。

请帮我填补概念上的空白。

1) 类型是 Sink[ByteString,NotUsed] 因为 Akka Streams 在组合物化值时是左偏的。这意味着组合接收器 (xformToDeviceReadings) 公开的物化值是来自其第一阶段的值(map,物化为 NotUsed)。

要公开您想要的物化值,您需要更改为

...
.toMat(Sink.queue())(Keep.right)

请注意,您的接收器类型现在更改为 Sink[ByteString, SinkQueueWithCancel[Readings]]

2) 要与您的 Readings 队列交互,您需要 运行 您的流,从而获得您的物化值(队列)并开始从中拉出物品。这可能发生在连接处理中:

val handler: Sink[IncomingConnection, Future[Done]] = Flow[IncomingConnection]
  .map { conn =>
    conn.handleWith(
      Flow[ByteString]
        .via(JsonFraming.objectScanner(maximumObjectLength = 400))
        .alsoToMat(collectingSink)(Keep.right)
        .map(_ => ByteString.empty)
        .filter(_ == false)
    )
  }
  .mapAsync(1){ queue ⇒
    queue.pull()
  }
  .toMat(Sink.foreach(println))(Keep.right)

请注意,上述解决方案并不理想,尤其是因为物化队列很可能不是线程安全的。如果您的目的是将这些 Readings 连接到其他下游流,您最好直接连接一个适合此目的的 Sink,而不是通过队列。