Akka-Streams 收集数据(Source -> Flow -> Flow (collect) -> Sink)
Akka-Streams collecting data (Source -> Flow -> Flow (collect) -> Sink)
我是 Scala 和 Akka 的新手。我有一个简单的 RunnableFlow:
Source -> Flow (do some transformation) -> Sink.runForeach
现在我想要这样的东西:
Source -> Flow1 (do some transformation) -> Flow2 (do some transformation) -> Sink.runForeach
但是 Flow2 应该等到 Flow1 的 100 个元素可用,然后将这 100 个元素转换为一个新元素(需要 Flow1 的所有 100 个元素)并将这个新元素提供给 Sink。
我做了一些研究并发现 Explicit user defined buffers 但我不明白如何访问 flow1 和 flow2 中的所有 100 个元素并对它们进行一些转换。有人可以解释一下吗?或者更好的 post 一个简单的小例子?或者两者兼而有之?
Akka 定义的集合
如果您不介意使用 akka 确定的集合类型,那么您可以改用 grouped
函数:
//alternative stream formation
val stream = Source(1 to 100).via(Flow[Int].grouped(bufferSize))
.runWith(Sink foreach println)
用户自定义集合
如果您想控制用于缓冲区的集合类型,例如一个 Seq
或 Array
:
type MyCollectionType[X] = Array[X]
def emptyMyCollection[X] : MyCollectionType[X] = Array.empty[X]
那么你可以用两个Flow来执行这个操作。第一个 Flow 执行 scan
来构建一个元素序列:
val bufferSize = 10
def appendToMyCollection[X](coll : MyCollectionType[X], i : X) : MyCollectionType[X] =
(if(coll.size < bufferSize) coll else emptyMyCollection[Int]) :+ i
val buffer : Flow[Int, MyCollectionType[Int], _] =
Flow[Int].scan[MyCollectionType[Int]](emptyMyCollection[Int]){
(coll, i) => appendToMyCollection(coll, i)
}
第二个 Flow 是一个 filter
大小合适的序列(即 "goldiLocks"):
val goldiLocks : Flow[MyCollectionType[Int], MyCollectionType[Int],_] =
Flow[MyCollectionType[Int]].filter(_.size == bufferSize)
这两个 Flow 可以组合生成一个 Stream,它将生成所需的集合类型:
val stream = Source(1 to 100).via(buffer)
.via(goldiLocks)
.runWith(Sink foreach println)
我是 Scala 和 Akka 的新手。我有一个简单的 RunnableFlow:
Source -> Flow (do some transformation) -> Sink.runForeach
现在我想要这样的东西:
Source -> Flow1 (do some transformation) -> Flow2 (do some transformation) -> Sink.runForeach
但是 Flow2 应该等到 Flow1 的 100 个元素可用,然后将这 100 个元素转换为一个新元素(需要 Flow1 的所有 100 个元素)并将这个新元素提供给 Sink。
我做了一些研究并发现 Explicit user defined buffers 但我不明白如何访问 flow1 和 flow2 中的所有 100 个元素并对它们进行一些转换。有人可以解释一下吗?或者更好的 post 一个简单的小例子?或者两者兼而有之?
Akka 定义的集合
如果您不介意使用 akka 确定的集合类型,那么您可以改用 grouped
函数:
//alternative stream formation
val stream = Source(1 to 100).via(Flow[Int].grouped(bufferSize))
.runWith(Sink foreach println)
用户自定义集合
如果您想控制用于缓冲区的集合类型,例如一个 Seq
或 Array
:
type MyCollectionType[X] = Array[X]
def emptyMyCollection[X] : MyCollectionType[X] = Array.empty[X]
那么你可以用两个Flow来执行这个操作。第一个 Flow 执行 scan
来构建一个元素序列:
val bufferSize = 10
def appendToMyCollection[X](coll : MyCollectionType[X], i : X) : MyCollectionType[X] =
(if(coll.size < bufferSize) coll else emptyMyCollection[Int]) :+ i
val buffer : Flow[Int, MyCollectionType[Int], _] =
Flow[Int].scan[MyCollectionType[Int]](emptyMyCollection[Int]){
(coll, i) => appendToMyCollection(coll, i)
}
第二个 Flow 是一个 filter
大小合适的序列(即 "goldiLocks"):
val goldiLocks : Flow[MyCollectionType[Int], MyCollectionType[Int],_] =
Flow[MyCollectionType[Int]].filter(_.size == bufferSize)
这两个 Flow 可以组合生成一个 Stream,它将生成所需的集合类型:
val stream = Source(1 to 100).via(buffer)
.via(goldiLocks)
.runWith(Sink foreach println)