通过 akka-stream 作为客户端读取 TCP
reading TCP as client via akka-stream
我是 akka-stream 的新手,正在尝试将该库用作客户端以从远程 TCP 服务器套接字读取(并且仅读取)。
我的尝试失败了,因为没有传入的 ByteStrings 正在处理。
两者都不是:
Source.empty[ByteString]
.via(Tcp().outgoingConnection(address, port))
.to(Sink.foreach(println(_))).run()
也不:
val connection = Tcp().outgoingConnection(address, port)
val sink = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.to(Sink.foreach(println(_)))
connection.
runWith(Source.empty, sink)
有效。
流量不是运行的原因是什么?非常感谢所有提示。
它不是 运行 因为它从来没有任何数据要处理:你有一个空流所以它立即终止。
正如@ViktorKlang 所提到的,客户端的空 Source
导致了流量基本上不会 运行 的问题。您可以将管道设置为仅通过使用单个空 ByteString
Source
而不是空的开始接收,如下例所示:
implicit val system = ActorSystem()
implicit val mater = ActorMaterializer()
val server = Tcp().bind("127.0.0.1", 2555)
server.runForeach{ conn =>
Source.tick(1 second, 1 second, "foo")
.map(f => ByteString(f))
.via(conn.flow)
.to(Sink.ignore).run
}
val clientFlow = Tcp().outgoingConnection(new InetSocketAddress("127.0.0.1", 2555))
clientFlow.runWith(Source(List(ByteString.empty)), Sink.foreach(bs => println("client received: " + bs.utf8String)))
如果你这样做,那么你将开始根据服务器端创建的tick源每秒从服务器接收数据。
您需要一个不完整且不生成任何数据的Source,例如Source.maybe。
请注意,它创建的 Promise 随后可用于终止流程。
Source.maybe[ByteString]
.via(Tcp().outgoingConnection(host, port))
.to(Sink.foreach(println(_))).run()
我是 akka-stream 的新手,正在尝试将该库用作客户端以从远程 TCP 服务器套接字读取(并且仅读取)。
我的尝试失败了,因为没有传入的 ByteStrings 正在处理。
两者都不是:
Source.empty[ByteString]
.via(Tcp().outgoingConnection(address, port))
.to(Sink.foreach(println(_))).run()
也不:
val connection = Tcp().outgoingConnection(address, port)
val sink = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.to(Sink.foreach(println(_)))
connection.
runWith(Source.empty, sink)
有效。
流量不是运行的原因是什么?非常感谢所有提示。
它不是 运行 因为它从来没有任何数据要处理:你有一个空流所以它立即终止。
正如@ViktorKlang 所提到的,客户端的空 Source
导致了流量基本上不会 运行 的问题。您可以将管道设置为仅通过使用单个空 ByteString
Source
而不是空的开始接收,如下例所示:
implicit val system = ActorSystem()
implicit val mater = ActorMaterializer()
val server = Tcp().bind("127.0.0.1", 2555)
server.runForeach{ conn =>
Source.tick(1 second, 1 second, "foo")
.map(f => ByteString(f))
.via(conn.flow)
.to(Sink.ignore).run
}
val clientFlow = Tcp().outgoingConnection(new InetSocketAddress("127.0.0.1", 2555))
clientFlow.runWith(Source(List(ByteString.empty)), Sink.foreach(bs => println("client received: " + bs.utf8String)))
如果你这样做,那么你将开始根据服务器端创建的tick源每秒从服务器接收数据。
您需要一个不完整且不生成任何数据的Source,例如Source.maybe。 请注意,它创建的 Promise 随后可用于终止流程。
Source.maybe[ByteString]
.via(Tcp().outgoingConnection(host, port))
.to(Sink.foreach(println(_))).run()