在 Akka-Streams 中拆分流
Splitting inside a flow in Akka-Streams
我正在尝试提出一种解决方案,将我接收到的传入字符串拆分为多个字符串。我一直在研究,看起来在以前版本的 Akka-Streams 中有一个 class Transformer
,你可以扩展它来进行这种转换。
在我使用的版本 (RC2) 中有 Stage
s 但我不太确定如何实现拆分模式。
Source.actorPublisher[String](MyActor.props).
.XXXXX(_.split("\n"))
.map(...)
.to(Sink(...))
我正在寻找 XXXXX
组件,它允许我输入一个 String
和 return 一个 String
的序列,并将每一个发送给其余的流量。
我同意@jrudolph 的观点,mapConcat
可能就是您要找的。一个简单的例子展示了这个方法的实际应用:
val strings = List(
"""hello
world
test
this""",
"""foo
bar
baz
"""
)
implicit val system = ActorSystem("test")
implicit val mater = ActorFlowMaterializer()
Source(strings).
mapConcat(_.split("\n").map(_.trim).toList).
runForeach(println)
如果您运行此代码,您将看到以下打印出来的内容:
hello
world
test
this
foo
bar
baz
Akka 为此类问题提供了 Framing
辅助函数。
假设您的字符集是 UTF-8,您可以编写一个函数,它接受分隔的 String
值的最大大小和 returns 可以执行拆分的 Flow
:
import akka.stream.scaladsl.Framing
import akka.util.ByteString
val newLineSplitter : (Int) => Flow[String, String, NotUsed] =
(maxLineSize) =>
Flow[String]
.map(ByteString.apply)
.via(Framing delimiter (ByteString("\n"), maxLineSize))
.via(Flow[ByteString] map (_.utf8String))
我正在尝试提出一种解决方案,将我接收到的传入字符串拆分为多个字符串。我一直在研究,看起来在以前版本的 Akka-Streams 中有一个 class Transformer
,你可以扩展它来进行这种转换。
在我使用的版本 (RC2) 中有 Stage
s 但我不太确定如何实现拆分模式。
Source.actorPublisher[String](MyActor.props).
.XXXXX(_.split("\n"))
.map(...)
.to(Sink(...))
我正在寻找 XXXXX
组件,它允许我输入一个 String
和 return 一个 String
的序列,并将每一个发送给其余的流量。
我同意@jrudolph 的观点,mapConcat
可能就是您要找的。一个简单的例子展示了这个方法的实际应用:
val strings = List(
"""hello
world
test
this""",
"""foo
bar
baz
"""
)
implicit val system = ActorSystem("test")
implicit val mater = ActorFlowMaterializer()
Source(strings).
mapConcat(_.split("\n").map(_.trim).toList).
runForeach(println)
如果您运行此代码,您将看到以下打印出来的内容:
hello
world
test
this
foo
bar
baz
Akka 为此类问题提供了 Framing
辅助函数。
假设您的字符集是 UTF-8,您可以编写一个函数,它接受分隔的 String
值的最大大小和 returns 可以执行拆分的 Flow
:
import akka.stream.scaladsl.Framing
import akka.util.ByteString
val newLineSplitter : (Int) => Flow[String, String, NotUsed] =
(maxLineSize) =>
Flow[String]
.map(ByteString.apply)
.via(Framing delimiter (ByteString("\n"), maxLineSize))
.via(Flow[ByteString] map (_.utf8String))