如何结合同一来源的“count”和“sum”计算
How to combine `count` and `sum` computations for the same source
有一些整数流:
val source = Source(List(1,2,3,4,5))
是否有可能从源中获得 (count, sum)
结果?对于上面的示例,它将是 (5, 15)
.
我想我应该使用流程并将它们结合起来:
val countFlow = Flow[Int].fold(0)((c, _) => c + 1)
val sumFlow = Flow[Int].fold(0)((s, e) => s + e)
如何将上述流程应用于源。或者有别的办法吗?
您可以简单地执行以下操作
source.map(list => (list.length, list.reduceLeft(_+_)))
希望对您有所帮助
最终总计
您提供的 Flow
几乎是正确的,可以在源耗尽后获得最终值:
case class Data(sum : Int = 0, count : Int = 0)
val updateData : (Data, Int) => Data =
(data, i) => Data(data.sum + i, data.count + 1)
val zeroData = Data()
val countAndSum = Flow[Int].fold(zeroData)(updateData)
然后可以将此流程与 Sink.head
组合以获得最终结果:
val result : Future[Data] =
source
.via(countAndSum)
.runWith(Sink[Data].head)
中间值
如果您想要 "running counter",例如你想要所有中间数据值,那么你可以使用 Flow.scan
而不是 fold:
val intermediateCountAndSum =
Flow[Int].scan(zeroData)(updateData)
你可以 "drain" 这些 Data
值变成 Sink.seq
:
val intermediateResult : Future[Seq[Data]] =
source
.via(intermediateCountAndSum)
.runWith(Sink[Data].seq)
val graph = Source.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val fanOut = builder.add(Broadcast[Int](2))
val merge = builder.add(Zip[Int, Int])
source ~> fanOut ~> countFlow ~> merge.in0
fanOut ~> sumFlow ~> merge.in1
SourceShape(merge.out)
})
graph.runWith(Sink.last)
case class Stats(sum: Int, count: Int) {
def add(el: Int): Stats = this.copy(sum = sum += el, count = count +=1)
}
object Stats {
def empty: Stats = Stats(0, 0)
}
val countFlow = Flow[Status].fold(Stats.empty)((stats, e) => stats add e)
有一些整数流:
val source = Source(List(1,2,3,4,5))
是否有可能从源中获得 (count, sum)
结果?对于上面的示例,它将是 (5, 15)
.
我想我应该使用流程并将它们结合起来:
val countFlow = Flow[Int].fold(0)((c, _) => c + 1)
val sumFlow = Flow[Int].fold(0)((s, e) => s + e)
如何将上述流程应用于源。或者有别的办法吗?
您可以简单地执行以下操作
source.map(list => (list.length, list.reduceLeft(_+_)))
希望对您有所帮助
最终总计
您提供的 Flow
几乎是正确的,可以在源耗尽后获得最终值:
case class Data(sum : Int = 0, count : Int = 0)
val updateData : (Data, Int) => Data =
(data, i) => Data(data.sum + i, data.count + 1)
val zeroData = Data()
val countAndSum = Flow[Int].fold(zeroData)(updateData)
然后可以将此流程与 Sink.head
组合以获得最终结果:
val result : Future[Data] =
source
.via(countAndSum)
.runWith(Sink[Data].head)
中间值
如果您想要 "running counter",例如你想要所有中间数据值,那么你可以使用 Flow.scan
而不是 fold:
val intermediateCountAndSum =
Flow[Int].scan(zeroData)(updateData)
你可以 "drain" 这些 Data
值变成 Sink.seq
:
val intermediateResult : Future[Seq[Data]] =
source
.via(intermediateCountAndSum)
.runWith(Sink[Data].seq)
val graph = Source.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val fanOut = builder.add(Broadcast[Int](2))
val merge = builder.add(Zip[Int, Int])
source ~> fanOut ~> countFlow ~> merge.in0
fanOut ~> sumFlow ~> merge.in1
SourceShape(merge.out)
})
graph.runWith(Sink.last)
case class Stats(sum: Int, count: Int) {
def add(el: Int): Stats = this.copy(sum = sum += el, count = count +=1)
}
object Stats {
def empty: Stats = Stats(0, 0)
}
val countFlow = Flow[Status].fold(Stats.empty)((stats, e) => stats add e)