Akka-Streams 收集数据(Source -> Flow -> Flow (collect) -> Sink)

Akka-Streams collecting data (Source -> Flow -> Flow (collect) -> Sink)

我是 Scala 和 Akka 的新手。我有一个简单的 RunnableFlow:

Source -> Flow (do some transformation) -> Sink.runForeach

现在我想要这样的东西:

Source -> Flow1 (do some transformation) -> Flow2 (do some transformation) -> Sink.runForeach

但是 Flow2 应该等到 Flow1 的 100 个元素可用,然后将这 100 个元素转换为一个新元素(需要 Flow1 的所有 100 个元素)并将这个新元素提供给 Sink。

我做了一些研究并发现 Explicit user defined buffers 但我不明白如何访问 flow1 和 flow2 中的所有 100 个元素并对它们进行一些转换。有人可以解释一下吗?或者更好的 post 一个简单的小例子?或者两者兼而有之?

Akka 定义的集合

如果您不介意使用 akka 确定的集合类型,那么您可以改用 grouped 函数:

//alternative stream formation
val stream = Source(1 to 100).via(Flow[Int].grouped(bufferSize))
                             .runWith(Sink foreach println)

用户自定义集合

如果您想控制用于缓冲区的集合类型,例如一个 SeqArray:

type MyCollectionType[X] = Array[X]

def emptyMyCollection[X] : MyCollectionType[X] = Array.empty[X]

那么你可以用两个Flow来执行这个操作。第一个 Flow 执行 scan 来构建一个元素序列:

val bufferSize = 10

def appendToMyCollection[X](coll : MyCollectionType[X], i : X) : MyCollectionType[X] = 
  (if(coll.size < bufferSize) coll else emptyMyCollection[Int]) :+ i

val buffer : Flow[Int, MyCollectionType[Int], _] = 
  Flow[Int].scan[MyCollectionType[Int]](emptyMyCollection[Int]){
    (coll, i) => appendToMyCollection(coll, i)
  }

第二个 Flow 是一个 filter 大小合适的序列(即 "goldiLocks"):

val goldiLocks : Flow[MyCollectionType[Int], MyCollectionType[Int],_] =
  Flow[MyCollectionType[Int]].filter(_.size == bufferSize)

这两个 Flow 可以组合生成一个 Stream,它将生成所需的集合类型:

val stream = Source(1 to 100).via(buffer)
                             .via(goldiLocks)
                             .runWith(Sink foreach println)