如何结合同一来源的“count”和“sum”计算

How to combine `count` and `sum` computations for the same source

有一些整数流:

val source = Source(List(1,2,3,4,5))

是否有可能从源中获得 (count, sum) 结果?对于上面的示例,它将是 (5, 15).

我想我应该使用流程并将它们结合起来:

val countFlow = Flow[Int].fold(0)((c, _) => c + 1)
val sumFlow = Flow[Int].fold(0)((s, e) => s + e)

如何将上述流程应用于源。或者有别的办法吗?

您可以简单地执行以下操作

source.map(list => (list.length, list.reduceLeft(_+_)))

希望对您有所帮助

最终总计

您提供的 Flow 几乎是正确的,可以在源耗尽后获得最终值:

case class Data(sum : Int = 0, count : Int = 0)

val updateData : (Data, Int) => Data = 
  (data, i) => Data(data.sum + i, data.count + 1)

val zeroData = Data()

val countAndSum = Flow[Int].fold(zeroData)(updateData)

然后可以将此流程与 Sink.head 组合以获得最终结果:

val result : Future[Data] = 
  source
    .via(countAndSum)
    .runWith(Sink[Data].head)

中间值

如果您想要 "running counter",例如你想要所有中间数据值,那么你可以使用 Flow.scan 而不是 fold:

val intermediateCountAndSum = 
  Flow[Int].scan(zeroData)(updateData)

你可以 "drain" 这些 Data 值变成 Sink.seq:

val intermediateResult : Future[Seq[Data]] = 
  source
    .via(intermediateCountAndSum)
    .runWith(Sink[Data].seq)
val graph = Source.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val fanOut = builder.add(Broadcast[Int](2))
    val merge = builder.add(Zip[Int, Int])

    source ~> fanOut ~> countFlow ~> merge.in0
              fanOut ~> sumFlow ~> merge.in1

    SourceShape(merge.out)
  })


  graph.runWith(Sink.last)
case class Stats(sum: Int, count: Int) {
  def add(el: Int): Stats = this.copy(sum = sum += el, count = count +=1)
}

object Stats {
  def empty: Stats = Stats(0, 0)
}

val countFlow = Flow[Status].fold(Stats.empty)((stats, e) => stats add e)