Akka Stream,Tcp().bind,客户端关闭socket时处理
Akka Stream, Tcp().bind, handle when the client close the socket
我是 Akka Stream 的新手,我想了解如何为我的项目处理 TCP 套接字。我从 Akka Stream official documentation.
中获取了这段代码
import akka.stream.scaladsl.Framing
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
connections.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.map(_ + "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
如果我使用 netcat 从终端连接,我可以看到 Akka Stream TCP 套接字按预期工作。我还发现如果我需要使用用户消息关闭连接,我可以使用 takeWhile
如下
import akka.stream.scaladsl.Framing
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
connections.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.takeWhile(_.toLowerCase.trim != "exit") // < - - - - - - HERE
.map(_ + "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
我找不到的是如何管理由 CMD + C
操作关闭的套接字。 Akka Stream 使用 Akka.io 在内部管理 TCP 连接,因此它必须在套接字关闭时发送一些 PeerClose
消息。所以,我对 Akka.io 的理解告诉我,我应该从套接字关闭中收到反馈,但我找不到如何使用 Akka Stream 来做到这一点。有办法管理吗?
connection.handleWith(echo)
是connection.flow.joinMat(echo)(Keep.right).run()
的句法糖,它会有echo
的物化值,一般用处不大。 Flow.via.map.takeWhile
有 NotUsed
作为物化值,所以这也基本上没用。但是,您可以将阶段附加到 echo
,这将以不同的方式实现。
其中之一是 .watchTermination
:
connections.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo: Flow[ByteString, ByteString, Future[Done]] = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.takeWhile(_.toLowerCase.trim != "exit") // < - - - - - - HERE
.map(_ + "!!!\n")
.map(ByteString(_))
// change the materialized value to a Future[Done]
.watchTermination()(Keep.right)
// you may need to have an implicit ExecutionContext in scope, e.g. system.dispatcher,
// if you don't already
connection.handleWith(echo).onComplete {
case Success(_) => println("stream completed successfully")
case Failure(e) => println(e.getMessage)
}
}
这样就不会区分你端还是对端正常关闭连接;它将区分流失败。
我是 Akka Stream 的新手,我想了解如何为我的项目处理 TCP 套接字。我从 Akka Stream official documentation.
中获取了这段代码import akka.stream.scaladsl.Framing
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
connections.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.map(_ + "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
如果我使用 netcat 从终端连接,我可以看到 Akka Stream TCP 套接字按预期工作。我还发现如果我需要使用用户消息关闭连接,我可以使用 takeWhile
如下
import akka.stream.scaladsl.Framing
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
connections.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.takeWhile(_.toLowerCase.trim != "exit") // < - - - - - - HERE
.map(_ + "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
我找不到的是如何管理由 CMD + C
操作关闭的套接字。 Akka Stream 使用 Akka.io 在内部管理 TCP 连接,因此它必须在套接字关闭时发送一些 PeerClose
消息。所以,我对 Akka.io 的理解告诉我,我应该从套接字关闭中收到反馈,但我找不到如何使用 Akka Stream 来做到这一点。有办法管理吗?
connection.handleWith(echo)
是connection.flow.joinMat(echo)(Keep.right).run()
的句法糖,它会有echo
的物化值,一般用处不大。 Flow.via.map.takeWhile
有 NotUsed
作为物化值,所以这也基本上没用。但是,您可以将阶段附加到 echo
,这将以不同的方式实现。
其中之一是 .watchTermination
:
connections.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo: Flow[ByteString, ByteString, Future[Done]] = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.takeWhile(_.toLowerCase.trim != "exit") // < - - - - - - HERE
.map(_ + "!!!\n")
.map(ByteString(_))
// change the materialized value to a Future[Done]
.watchTermination()(Keep.right)
// you may need to have an implicit ExecutionContext in scope, e.g. system.dispatcher,
// if you don't already
connection.handleWith(echo).onComplete {
case Success(_) => println("stream completed successfully")
case Failure(e) => println(e.getMessage)
}
}
这样就不会区分你端还是对端正常关闭连接;它将区分流失败。