您将如何更改此 Akka Streams 示例以获得物化值 Future[ServerBinding]?
How would you change this Akka Streams example to get the materialized value Future[ServerBinding]?
http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-io.html 中的第二个示例如下。我添加了 host
& port
.
的定义
val host = "localhost"
val port = 4444
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
connections runForeach { (connection: IncomingConnection) =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(_ + "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
您将如何更改示例以便您可以 也 获得 Future[ServerBinding]
以便您可以记录成功的绑定并处理绑定错误,如果,例如,该端口已被使用?
您应该能够通过摆脱 runForeach
的便利性来实现这一点,因为 runForeach
隐藏了所涉及的 Sink
的详细信息,而是使用明确的 Sink
。像这样:
val host = "localhost"
val port = 4444
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
val sink = Sink.foreach[IncomingConnection]{ connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(_ + "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
val serverBinding:Future[ServerBinding] = connections.to(sink).run
http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-io.html 中的第二个示例如下。我添加了 host
& port
.
val host = "localhost"
val port = 4444
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
connections runForeach { (connection: IncomingConnection) =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(_ + "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
您将如何更改示例以便您可以 也 获得 Future[ServerBinding]
以便您可以记录成功的绑定并处理绑定错误,如果,例如,该端口已被使用?
您应该能够通过摆脱 runForeach
的便利性来实现这一点,因为 runForeach
隐藏了所涉及的 Sink
的详细信息,而是使用明确的 Sink
。像这样:
val host = "localhost"
val port = 4444
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
val sink = Sink.foreach[IncomingConnection]{ connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(_ + "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
val serverBinding:Future[ServerBinding] = connections.to(sink).run