交替减少以发送实时统计数据(减少)

Alternate to reduce to send live stats (of reduce)

我有一个愚蠢的问题,但不知道原因:

import akka.{Done, NotUsed}
import akka.actor.Status.Success
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.{Flow, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}

import scala.concurrent.Future


object Generic {
  def main(args: Array[String]) {

    implicit val system = ActorSystem("system")
    implicit val mat = ActorMaterializer()

    val sink: Sink[Any, Future[Done]] = Sink.foreach(x => println("Ans =====> " + x))

    val counts = Flow[String]
      .mapConcat(x => x.split("\s").toList)
      .filter(!_.isEmpty)
      .groupBy(Int.MaxValue, identity)
      .map(x => x -> 1)
      .reduce((l, r) => (l._1, l._2 + r._2))
      .mergeSubstreams

    val fold: Flow[String, Int, NotUsed] = Flow[String].map(x => 1).fold(0)(_ + _)

    val words: RunnableGraph[ActorRef] = Source.actorRef(Int.MaxValue, OverflowStrategy.fail)
      .via(counts)
      .to(sink)

    val ref = words.run()

    for {
      ln <- scala.io.Source.stdin.getLines.takeWhile(_ != "-1")
    } {
      println("---> Message sent " + ln)
      ref ! ln
    }
    ref ! Success("end")
    Thread.sleep(5000)
    system.terminate()
  }
}

它做的事情很简单:在应用程序终端上,我输入句子。它提取单词,然后保持每个单词的频率。它按预期工作。问题是:

一个蹩脚的方法是在 reduce 中有一个打印语句。但是我可以做点别的吗,比如将每个句子的实时统计信息 post 发送到另一个接收器(通过广播?)

看看 scan 组合器。它会给你 fold/reduce 的聚合能力,但它会发出中间结果。

//    .reduce((l, r) => (l._1, l._2 + r._2))
      .scan("" → 0)((l, r) => (l._1, l._2 + r._2))

此外,如果您想将输出发送到日志 Sink,您可以查看 alsoTo,这将有效地执行广播到选择的一方 Sink .