将 TCP 服务器的输出捕获到 Akka Stream 队列的最推荐方法是什么?
What is the most recommended way to capture TCP Server's output to an Akka Stream queue?
我正在试验 Akka Streams,以期准确了解应该如何使用 TCP 服务器从客户端接收的内容(服务器不需要响应客户端)。
这是一个标准的 TCP 服务器实现(在应用我从@heiko-seeberger 的简洁解释 中理解的内容之后):
def runServer(system: ActorSystem, address: String, port: Int, collectingSink: Sink[ByteString,NotUsed]): Unit = {
implicit val sys = system
import system.dispatcher
implicit val materializer = ActorMaterializer()
val handler = Sink.foreach[IncomingConnection] { conn =>
conn.handleWith(
Flow[ByteString]
.via(JsonFraming.objectScanner(maximumObjectLength = 400))
.alsoTo(collectingSink)
.map(b => ByteString.empty)
.filter(_ == false)
)
}
val connections = Tcp().bind(address, port)
val binding = connections.to(handler).run()
binding.onComplete {
case Success(b) =>
println("Server started, listening on: " + b.localAddress)
case Failure(e) =>
println(s"Server could not bind to $address:$port: ${e.getMessage}")
system.terminate()
}
}
我作为 collectingSink 参数传递给 运行Server() 函数的值是这样构造的:
import akka.stream.scaladsl.{Flow, JsonFraming, Sink}
import akka.util.ByteString
import play.api.libs.json.Json
object DeviceDataProcessor {
case class Readings (
radiationLevel: Double,
ambientTemp: Double,
photoSensor: Double,
humidity: Double,
sensorUUID: String,
timestampAttached: Long)
val xformToDeviceReadings = Flow[ByteString]
.via(JsonFraming.objectScanner(maximumObjectLength = 400))
.map(b => {
val jsonified = Json.parse(b.utf8String)
val readings = Readings(
(jsonified \ "radiation_level") .as[Double],
(jsonified \ "ambient_temperature") .as[Double],
(jsonified \ "photosensor") .as[Double],
(jsonified \ "humidity") .as[Double],
(jsonified \ "sensor_uuid") .as[String],
(jsonified \ "timestamp") .as[Long]
)
readings
})
.to(Sink.queue())
}
最后,这就是我 运行 我的 Driver:
object ConsumerDriver extends App {
val actorSystem = ActorSystem("ServerSide")
TCPServer.runServer(actorSystem,"127.0.0.1", 9899,DeviceDataProcessor.xformToDeviceReadings)
}
我没能理解这里两件事背后的原因:
1) xformToDeviceReadings 的类型派生为
Sink[ByteStream,NotUsed]
映射类型 Readings 不应该出现在这里吗?
2) 如何开始从此队列读取并将元素传递到另一个上游流?我是否必须先具体化然后使用具体化队列作为我的新 Source?
我已经阅读了 Akka 站点上的文档。但是,我很乐意被重定向到本文档的任何特定部分,或 SO 上的其他帖子。
请帮我填补概念上的空白。
1) 类型是 Sink[ByteString,NotUsed]
因为 Akka Streams 在组合物化值时是左偏的。这意味着组合接收器 (xformToDeviceReadings
) 公开的物化值是来自其第一阶段的值(map
,物化为 NotUsed
)。
要公开您想要的物化值,您需要更改为
...
.toMat(Sink.queue())(Keep.right)
请注意,您的接收器类型现在更改为 Sink[ByteString, SinkQueueWithCancel[Readings]]
2) 要与您的 Readings
队列交互,您需要 运行 您的流,从而获得您的物化值(队列)并开始从中拉出物品。这可能发生在连接处理中:
val handler: Sink[IncomingConnection, Future[Done]] = Flow[IncomingConnection]
.map { conn =>
conn.handleWith(
Flow[ByteString]
.via(JsonFraming.objectScanner(maximumObjectLength = 400))
.alsoToMat(collectingSink)(Keep.right)
.map(_ => ByteString.empty)
.filter(_ == false)
)
}
.mapAsync(1){ queue ⇒
queue.pull()
}
.toMat(Sink.foreach(println))(Keep.right)
请注意,上述解决方案并不理想,尤其是因为物化队列很可能不是线程安全的。如果您的目的是将这些 Readings
连接到其他下游流,您最好直接连接一个适合此目的的 Sink
,而不是通过队列。
我正在试验 Akka Streams,以期准确了解应该如何使用 TCP 服务器从客户端接收的内容(服务器不需要响应客户端)。
这是一个标准的 TCP 服务器实现(在应用我从@heiko-seeberger 的简洁解释
def runServer(system: ActorSystem, address: String, port: Int, collectingSink: Sink[ByteString,NotUsed]): Unit = {
implicit val sys = system
import system.dispatcher
implicit val materializer = ActorMaterializer()
val handler = Sink.foreach[IncomingConnection] { conn =>
conn.handleWith(
Flow[ByteString]
.via(JsonFraming.objectScanner(maximumObjectLength = 400))
.alsoTo(collectingSink)
.map(b => ByteString.empty)
.filter(_ == false)
)
}
val connections = Tcp().bind(address, port)
val binding = connections.to(handler).run()
binding.onComplete {
case Success(b) =>
println("Server started, listening on: " + b.localAddress)
case Failure(e) =>
println(s"Server could not bind to $address:$port: ${e.getMessage}")
system.terminate()
}
}
我作为 collectingSink 参数传递给 运行Server() 函数的值是这样构造的:
import akka.stream.scaladsl.{Flow, JsonFraming, Sink}
import akka.util.ByteString
import play.api.libs.json.Json
object DeviceDataProcessor {
case class Readings (
radiationLevel: Double,
ambientTemp: Double,
photoSensor: Double,
humidity: Double,
sensorUUID: String,
timestampAttached: Long)
val xformToDeviceReadings = Flow[ByteString]
.via(JsonFraming.objectScanner(maximumObjectLength = 400))
.map(b => {
val jsonified = Json.parse(b.utf8String)
val readings = Readings(
(jsonified \ "radiation_level") .as[Double],
(jsonified \ "ambient_temperature") .as[Double],
(jsonified \ "photosensor") .as[Double],
(jsonified \ "humidity") .as[Double],
(jsonified \ "sensor_uuid") .as[String],
(jsonified \ "timestamp") .as[Long]
)
readings
})
.to(Sink.queue())
}
最后,这就是我 运行 我的 Driver:
object ConsumerDriver extends App {
val actorSystem = ActorSystem("ServerSide")
TCPServer.runServer(actorSystem,"127.0.0.1", 9899,DeviceDataProcessor.xformToDeviceReadings)
}
我没能理解这里两件事背后的原因:
1) xformToDeviceReadings 的类型派生为
Sink[ByteStream,NotUsed]
映射类型 Readings 不应该出现在这里吗?
2) 如何开始从此队列读取并将元素传递到另一个上游流?我是否必须先具体化然后使用具体化队列作为我的新 Source?
我已经阅读了 Akka 站点上的文档。但是,我很乐意被重定向到本文档的任何特定部分,或 SO 上的其他帖子。
请帮我填补概念上的空白。
1) 类型是 Sink[ByteString,NotUsed]
因为 Akka Streams 在组合物化值时是左偏的。这意味着组合接收器 (xformToDeviceReadings
) 公开的物化值是来自其第一阶段的值(map
,物化为 NotUsed
)。
要公开您想要的物化值,您需要更改为
...
.toMat(Sink.queue())(Keep.right)
请注意,您的接收器类型现在更改为 Sink[ByteString, SinkQueueWithCancel[Readings]]
2) 要与您的 Readings
队列交互,您需要 运行 您的流,从而获得您的物化值(队列)并开始从中拉出物品。这可能发生在连接处理中:
val handler: Sink[IncomingConnection, Future[Done]] = Flow[IncomingConnection]
.map { conn =>
conn.handleWith(
Flow[ByteString]
.via(JsonFraming.objectScanner(maximumObjectLength = 400))
.alsoToMat(collectingSink)(Keep.right)
.map(_ => ByteString.empty)
.filter(_ == false)
)
}
.mapAsync(1){ queue ⇒
queue.pull()
}
.toMat(Sink.foreach(println))(Keep.right)
请注意,上述解决方案并不理想,尤其是因为物化队列很可能不是线程安全的。如果您的目的是将这些 Readings
连接到其他下游流,您最好直接连接一个适合此目的的 Sink
,而不是通过队列。