如何使用 SubFlows 对已排序流的项目进行分组?
How do I group items of sorted stream with SubFlows?
你们能解释一下如何在 akka-streams 中使用 new groupBy
吗? Documentation 好像没什么用。 groupBy
曾经 return (T, Source)
但现在不是了。这是我的示例(我从文档中模仿了一个):
Source(List(
1 -> "1a", 1 -> "1b", 1 -> "1c",
2 -> "2a", 2 -> "2b",
3 -> "3a", 3 -> "3b", 3 -> "3c",
4 -> "4a",
5 -> "5a", 5 -> "5b", 5 -> "5c",
6 -> "6a", 6 -> "6b",
7 -> "7a",
8 -> "8a", 8 -> "8b",
9 -> "9a", 9 -> "9b",
))
.groupBy(3, _._1)
.map { case (aid, raw) =>
aid -> List(raw)
}
.reduce[(Int, List[String])] { case (l: (Int, List[String]), r: (Int, List[String])) =>
(l._1, l._2 ::: r._2)
}
.mergeSubstreams
.runForeach { case (aid: Int, items: List[String]) =>
println(s"$aid - ${items.length}")
}
这只是挂起。也许它挂起是因为子流的数量低于唯一键的数量。但是无限流怎么办?我想分组直到密钥更改。
在我的真实数据流中,数据总是按我分组的值排序。也许我根本不需要 groupBy
?
如果您的流数据始终排序,您可以利用它以这种方式进行分组:
val source = Source(List(
1 -> "1a", 1 -> "1b", 1 -> "1c",
2 -> "2a", 2 -> "2b",
3 -> "3a", 3 -> "3b", 3 -> "3c",
4 -> "4a",
5 -> "5a", 5 -> "5b", 5 -> "5c",
6 -> "6a", 6 -> "6b",
7 -> "7a",
8 -> "8a", 8 -> "8b",
9 -> "9a", 9 -> "9b",
))
source
// group elements by pairs
// the last one will be not a pair, but a single element
.sliding(2,1)
// when both keys in a pair are different, we split the group into a subflow
.splitAfter(pair => (pair.headOption, pair.lastOption) match {
case (Some((key1, _)), Some((key2, _))) => key1 != key2
})
// then we cut only the first element of the pair
// to reconstruct the original stream, but grouped by sorted key
.mapConcat(_.headOption.toList)
// then we fold the substream into a single element
.fold(0 -> List.empty[String]) {
case ((_, values), (key, value)) => key -> (value +: values)
}
// merge it back and dump the results
.mergeSubstreams
.runWith(Sink.foreach(println))
最后你会得到这些结果:
(1,List(1c, 1b, 1a))
(2,List(2b, 2a))
(3,List(3c, 3b, 3a))
(4,List(4a))
(5,List(5c, 5b, 5a))
(6,List(6b, 6a))
(7,List(7a))
(8,List(8b, 8a))
(9,List(9a))
但与 groupBy 相比,您不受不同键数的限制。
您也可以使用 statefulMapConcat
实现它,考虑到它不进行任何子实现,它会稍微便宜一些(但您必须忍受使用 var
s 的耻辱):
source.statefulMapConcat { () =>
var prevKey: Option[Int] = None
var acc: List[String] = Nil
{ case (newKey, str) =>
prevKey match {
case Some(`newKey`) | None =>
prevKey = Some(newKey)
acc = str :: acc
Nil
case Some(oldKey) =>
val accForOldKey = acc.reverse
prevKey = Some(newKey)
acc = str :: Nil
(oldKey -> accForOldKey) :: Nil
}
}
}.runForeach(println)
我最终实现了自定义阶段
class GroupAfterKeyChangeStage[K, T](keyForItem: T ⇒ K, maxBufferSize: Int) extends GraphStage[FlowShape[T, List[T]]] {
private val in = Inlet[T]("GroupAfterKeyChangeStage.in")
private val out = Outlet[List[T]]("GroupAfterKeyChangeStage.out")
override val shape: FlowShape[T, List[T]] =
FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private val buffer = new ListBuffer[T]
private var currentKey: Option[K] = None
// InHandler
override def onPush(): Unit = {
val nextItem = grab(in)
val nextItemKey = keyForItem(nextItem)
if (currentKey.forall(_ == nextItemKey)) {
if (currentKey.isEmpty)
currentKey = Some(nextItemKey)
if (buffer.size == maxBufferSize)
failStage(new RuntimeException(s"Maximum buffer size is exceeded on key $nextItemKey"))
else {
buffer += nextItem
pull(in)
}
} else {
val result = buffer.result()
buffer.clear()
buffer += nextItem
currentKey = Some(nextItemKey)
push(out, result)
}
}
// OutHandler
override def onPull(): Unit = {
if (isClosed(in))
failStage(new RuntimeException("Upstream finished but there was a truncated final frame in the buffer"))
else
pull(in)
}
// InHandler
override def onUpstreamFinish(): Unit = {
val result = buffer.result()
if (result.nonEmpty) {
emit(out, result)
completeStage()
} else
completeStage()
// else swallow the termination and wait for pull
}
override def postStop(): Unit = {
buffer.clear()
}
setHandlers(in, out, this)
}
}
如果您不想复制粘贴它,我已将它添加到我维护的 helper library 中。为了使用你需要添加
Resolver.bintrayRepo("cppexpert", "maven")
给你的解析器。将 add foolowing 添加到您的依赖项中
"com.walkmind" %% "scala-tricks" % "2.15"
它在 com.walkmind.akkastream.FlowExt
中实现为流程
def groupSortedByKey[K, T](keyForItem: T ⇒ K, maxBufferSize: Int): Flow[T, List[T], NotUsed]
以我的例子为例
source
.via(FlowExt.groupSortedByKey(_._1, 128))
一年后,Akka Stream Contrib has a AccumulateWhileUnchanged class 这样做了:
libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.9"
和:
import akka.stream.contrib.AccumulateWhileUnchanged
source.via(new AccumulateWhileUnchanged(_._1))
你们能解释一下如何在 akka-streams 中使用 new groupBy
吗? Documentation 好像没什么用。 groupBy
曾经 return (T, Source)
但现在不是了。这是我的示例(我从文档中模仿了一个):
Source(List(
1 -> "1a", 1 -> "1b", 1 -> "1c",
2 -> "2a", 2 -> "2b",
3 -> "3a", 3 -> "3b", 3 -> "3c",
4 -> "4a",
5 -> "5a", 5 -> "5b", 5 -> "5c",
6 -> "6a", 6 -> "6b",
7 -> "7a",
8 -> "8a", 8 -> "8b",
9 -> "9a", 9 -> "9b",
))
.groupBy(3, _._1)
.map { case (aid, raw) =>
aid -> List(raw)
}
.reduce[(Int, List[String])] { case (l: (Int, List[String]), r: (Int, List[String])) =>
(l._1, l._2 ::: r._2)
}
.mergeSubstreams
.runForeach { case (aid: Int, items: List[String]) =>
println(s"$aid - ${items.length}")
}
这只是挂起。也许它挂起是因为子流的数量低于唯一键的数量。但是无限流怎么办?我想分组直到密钥更改。
在我的真实数据流中,数据总是按我分组的值排序。也许我根本不需要 groupBy
?
如果您的流数据始终排序,您可以利用它以这种方式进行分组:
val source = Source(List(
1 -> "1a", 1 -> "1b", 1 -> "1c",
2 -> "2a", 2 -> "2b",
3 -> "3a", 3 -> "3b", 3 -> "3c",
4 -> "4a",
5 -> "5a", 5 -> "5b", 5 -> "5c",
6 -> "6a", 6 -> "6b",
7 -> "7a",
8 -> "8a", 8 -> "8b",
9 -> "9a", 9 -> "9b",
))
source
// group elements by pairs
// the last one will be not a pair, but a single element
.sliding(2,1)
// when both keys in a pair are different, we split the group into a subflow
.splitAfter(pair => (pair.headOption, pair.lastOption) match {
case (Some((key1, _)), Some((key2, _))) => key1 != key2
})
// then we cut only the first element of the pair
// to reconstruct the original stream, but grouped by sorted key
.mapConcat(_.headOption.toList)
// then we fold the substream into a single element
.fold(0 -> List.empty[String]) {
case ((_, values), (key, value)) => key -> (value +: values)
}
// merge it back and dump the results
.mergeSubstreams
.runWith(Sink.foreach(println))
最后你会得到这些结果:
(1,List(1c, 1b, 1a))
(2,List(2b, 2a))
(3,List(3c, 3b, 3a))
(4,List(4a))
(5,List(5c, 5b, 5a))
(6,List(6b, 6a))
(7,List(7a))
(8,List(8b, 8a))
(9,List(9a))
但与 groupBy 相比,您不受不同键数的限制。
您也可以使用 statefulMapConcat
实现它,考虑到它不进行任何子实现,它会稍微便宜一些(但您必须忍受使用 var
s 的耻辱):
source.statefulMapConcat { () =>
var prevKey: Option[Int] = None
var acc: List[String] = Nil
{ case (newKey, str) =>
prevKey match {
case Some(`newKey`) | None =>
prevKey = Some(newKey)
acc = str :: acc
Nil
case Some(oldKey) =>
val accForOldKey = acc.reverse
prevKey = Some(newKey)
acc = str :: Nil
(oldKey -> accForOldKey) :: Nil
}
}
}.runForeach(println)
我最终实现了自定义阶段
class GroupAfterKeyChangeStage[K, T](keyForItem: T ⇒ K, maxBufferSize: Int) extends GraphStage[FlowShape[T, List[T]]] {
private val in = Inlet[T]("GroupAfterKeyChangeStage.in")
private val out = Outlet[List[T]]("GroupAfterKeyChangeStage.out")
override val shape: FlowShape[T, List[T]] =
FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private val buffer = new ListBuffer[T]
private var currentKey: Option[K] = None
// InHandler
override def onPush(): Unit = {
val nextItem = grab(in)
val nextItemKey = keyForItem(nextItem)
if (currentKey.forall(_ == nextItemKey)) {
if (currentKey.isEmpty)
currentKey = Some(nextItemKey)
if (buffer.size == maxBufferSize)
failStage(new RuntimeException(s"Maximum buffer size is exceeded on key $nextItemKey"))
else {
buffer += nextItem
pull(in)
}
} else {
val result = buffer.result()
buffer.clear()
buffer += nextItem
currentKey = Some(nextItemKey)
push(out, result)
}
}
// OutHandler
override def onPull(): Unit = {
if (isClosed(in))
failStage(new RuntimeException("Upstream finished but there was a truncated final frame in the buffer"))
else
pull(in)
}
// InHandler
override def onUpstreamFinish(): Unit = {
val result = buffer.result()
if (result.nonEmpty) {
emit(out, result)
completeStage()
} else
completeStage()
// else swallow the termination and wait for pull
}
override def postStop(): Unit = {
buffer.clear()
}
setHandlers(in, out, this)
}
}
如果您不想复制粘贴它,我已将它添加到我维护的 helper library 中。为了使用你需要添加
Resolver.bintrayRepo("cppexpert", "maven")
给你的解析器。将 add foolowing 添加到您的依赖项中
"com.walkmind" %% "scala-tricks" % "2.15"
它在 com.walkmind.akkastream.FlowExt
中实现为流程
def groupSortedByKey[K, T](keyForItem: T ⇒ K, maxBufferSize: Int): Flow[T, List[T], NotUsed]
以我的例子为例
source
.via(FlowExt.groupSortedByKey(_._1, 128))
一年后,Akka Stream Contrib has a AccumulateWhileUnchanged class 这样做了:
libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.9"
和:
import akka.stream.contrib.AccumulateWhileUnchanged
source.via(new AccumulateWhileUnchanged(_._1))