在 Akka-Streams 中拆分流

Splitting inside a flow in Akka-Streams

我正在尝试提出一种解决方案,将我接收到的传入字符串拆分为多个字符串。我一直在研究,看起来在以前版本的 Akka-Streams 中有一个 class Transformer,你可以扩展它来进行这种转换。

在我使用的版本 (RC2) 中有 Stages 但我不太确定如何实现拆分模式。

Source.actorPublisher[String](MyActor.props).
.XXXXX(_.split("\n"))
.map(...)
.to(Sink(...))

我正在寻找 XXXXX 组件,它允许我输入一个 String 和 return 一个 String 的序列,并将每一个发送给其余的流量。

我同意@jrudolph 的观点,mapConcat 可能就是您要找的。一个简单的例子展示了这个方法的实际应用:

  val strings = List(
  """hello
     world
     test
     this""",
     """foo
     bar
     baz
     """

  )

  implicit val system = ActorSystem("test")
  implicit val mater = ActorFlowMaterializer()
  Source(strings).
    mapConcat(_.split("\n").map(_.trim).toList).
    runForeach(println)

如果您运行此代码,您将看到以下打印出来的内容:

hello
world
test
this
foo     
bar
baz

Akka 为此类问题提供了 Framing 辅助函数。

假设您的字符集是 UTF-8,您可以编写一个函数,它接受分隔的 String 值的最大大小和 returns 可以执行拆分的 Flow

import akka.stream.scaladsl.Framing
import akka.util.ByteString

val newLineSplitter : (Int) => Flow[String, String, NotUsed] = 
  (maxLineSize) =>
    Flow[String]
      .map(ByteString.apply)
      .via(Framing delimiter (ByteString("\n"), maxLineSize))
      .via(Flow[ByteString] map (_.utf8String))