Akka 流减少到更小的流
Akka streams reduce to smaller stream
我有一个有序的数据流
A A A B B C C C C ... (very long)
我想将其转换为形式为 (item, count) 的聚合流:
(A, 3) (B, 2) (C, 4)
为此,我可以在 Akka Streams 中使用哪些运算符?
Source.fromPublisher(publisher)
.aggregateSomehow() // ?
.runWith(sink)
我已经调查了 .groupBy 但它要求我提前知道类别的数量,而我不知道。我也相信它会将所有组保留在我想避免的内存中。我应该能够在 (A, 3) 处理后丢弃它并释放它消耗的资源。
编辑: 要求类似的功能,但使用子流。但是似乎不需要使用 SubFlows,因为我有一个使用 statefulMapConcat
组合器的解决方案。
一种选择是使用 statefulMapConcat 组合器:
Source(List("A", "A", "B", "B", "B", "C", "C", ""))
.statefulMapConcat({ () =>
var lastChar = ""
var count = 0
char => if(lastChar == char) {
count += 1
List.empty
} else {
val charCount = (lastChar, count)
lastChar = char
count = 1
List(charCount)
}
})
.runForeach(println)
然而,这需要将一个元素附加到输入流以标记结束。
输出:
(,0)
(A,2)
(B,3)
(C,2)
感谢@chunjef 评论中的建议
我有一个有序的数据流
A A A B B C C C C ... (very long)
我想将其转换为形式为 (item, count) 的聚合流:
(A, 3) (B, 2) (C, 4)
为此,我可以在 Akka Streams 中使用哪些运算符?
Source.fromPublisher(publisher)
.aggregateSomehow() // ?
.runWith(sink)
我已经调查了 .groupBy 但它要求我提前知道类别的数量,而我不知道。我也相信它会将所有组保留在我想避免的内存中。我应该能够在 (A, 3) 处理后丢弃它并释放它消耗的资源。
编辑:statefulMapConcat
组合器的解决方案。
一种选择是使用 statefulMapConcat 组合器:
Source(List("A", "A", "B", "B", "B", "C", "C", ""))
.statefulMapConcat({ () =>
var lastChar = ""
var count = 0
char => if(lastChar == char) {
count += 1
List.empty
} else {
val charCount = (lastChar, count)
lastChar = char
count = 1
List(charCount)
}
})
.runForeach(println)
然而,这需要将一个元素附加到输入流以标记结束。
输出:
(,0)
(A,2)
(B,3)
(C,2)
感谢@chunjef 评论中的建议