您将如何更改此 Akka Streams 示例以获得物化值 Future[ServerBinding]?

How would you change this Akka Streams example to get the materialized value Future[ServerBinding]?

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-io.html 中的第二个示例如下。我添加了 host & port.

的定义
  val host = "localhost"
  val port = 4444
  val connections: Source[IncomingConnection, Future[ServerBinding]] =
    Tcp().bind(host, port)

  connections runForeach { (connection: IncomingConnection) =>
    println(s"New connection from: ${connection.remoteAddress}")

    val echo = Flow[ByteString]
      .via(Framing.delimiter(
        ByteString("\n"),
        maximumFrameLength = 256,
        allowTruncation = true))
      .map(_.utf8String)
      .map(_ + "!!!\n")
      .map(ByteString(_))

    connection.handleWith(echo)
  }

您将如何更改示例以便您可以 获得 Future[ServerBinding] 以便您可以记录成功的绑定并处理绑定错误,如果,例如,该端口已被使用?

您应该能够通过摆脱 runForeach 的便利性来实现这一点,因为 runForeach 隐藏了所涉及的 Sink 的详细信息,而是使用明确的 Sink。像这样:

  val host = "localhost"
  val port = 4444
  val connections: Source[IncomingConnection, Future[ServerBinding]] =
    Tcp().bind(host, port)


  val sink = Sink.foreach[IncomingConnection]{ connection =>
    println(s"New connection from: ${connection.remoteAddress}")

    val echo = Flow[ByteString]
      .via(Framing.delimiter(
        ByteString("\n"),
        maximumFrameLength = 256,
        allowTruncation = true))
      .map(_.utf8String)
      .map(_ + "!!!\n")
      .map(ByteString(_))

    connection.handleWith(echo)    
  }

  val serverBinding:Future[ServerBinding] = connections.to(sink).run