交替减少以发送实时统计数据(减少)
Alternate to reduce to send live stats (of reduce)
我有一个愚蠢的问题,但不知道原因:
import akka.{Done, NotUsed}
import akka.actor.Status.Success
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.{Flow, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.Future
object Generic {
def main(args: Array[String]) {
implicit val system = ActorSystem("system")
implicit val mat = ActorMaterializer()
val sink: Sink[Any, Future[Done]] = Sink.foreach(x => println("Ans =====> " + x))
val counts = Flow[String]
.mapConcat(x => x.split("\s").toList)
.filter(!_.isEmpty)
.groupBy(Int.MaxValue, identity)
.map(x => x -> 1)
.reduce((l, r) => (l._1, l._2 + r._2))
.mergeSubstreams
val fold: Flow[String, Int, NotUsed] = Flow[String].map(x => 1).fold(0)(_ + _)
val words: RunnableGraph[ActorRef] = Source.actorRef(Int.MaxValue, OverflowStrategy.fail)
.via(counts)
.to(sink)
val ref = words.run()
for {
ln <- scala.io.Source.stdin.getLines.takeWhile(_ != "-1")
} {
println("---> Message sent " + ln)
ref ! ln
}
ref ! Success("end")
Thread.sleep(5000)
system.terminate()
}
}
它做的事情很简单:在应用程序终端上,我输入句子。它提取单词,然后保持每个单词的频率。它按预期工作。问题是:
- 源头是无限的流。即只有当我结束源时,它才会打印输出。我可以重构程序以始终打印实时统计信息而不是结束吗?我明白,这种行为是由于
reduce
一个蹩脚的方法是在 reduce
中有一个打印语句。但是我可以做点别的吗,比如将每个句子的实时统计信息 post 发送到另一个接收器(通过广播?)
看看 scan
组合器。它会给你 fold
/reduce
的聚合能力,但它会发出中间结果。
// .reduce((l, r) => (l._1, l._2 + r._2))
.scan("" → 0)((l, r) => (l._1, l._2 + r._2))
此外,如果您想将输出发送到日志 Sink
,您可以查看 alsoTo
,这将有效地执行广播到选择的一方 Sink
.
我有一个愚蠢的问题,但不知道原因:
import akka.{Done, NotUsed}
import akka.actor.Status.Success
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.{Flow, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.Future
object Generic {
def main(args: Array[String]) {
implicit val system = ActorSystem("system")
implicit val mat = ActorMaterializer()
val sink: Sink[Any, Future[Done]] = Sink.foreach(x => println("Ans =====> " + x))
val counts = Flow[String]
.mapConcat(x => x.split("\s").toList)
.filter(!_.isEmpty)
.groupBy(Int.MaxValue, identity)
.map(x => x -> 1)
.reduce((l, r) => (l._1, l._2 + r._2))
.mergeSubstreams
val fold: Flow[String, Int, NotUsed] = Flow[String].map(x => 1).fold(0)(_ + _)
val words: RunnableGraph[ActorRef] = Source.actorRef(Int.MaxValue, OverflowStrategy.fail)
.via(counts)
.to(sink)
val ref = words.run()
for {
ln <- scala.io.Source.stdin.getLines.takeWhile(_ != "-1")
} {
println("---> Message sent " + ln)
ref ! ln
}
ref ! Success("end")
Thread.sleep(5000)
system.terminate()
}
}
它做的事情很简单:在应用程序终端上,我输入句子。它提取单词,然后保持每个单词的频率。它按预期工作。问题是:
- 源头是无限的流。即只有当我结束源时,它才会打印输出。我可以重构程序以始终打印实时统计信息而不是结束吗?我明白,这种行为是由于
reduce
一个蹩脚的方法是在 reduce
中有一个打印语句。但是我可以做点别的吗,比如将每个句子的实时统计信息 post 发送到另一个接收器(通过广播?)
看看 scan
组合器。它会给你 fold
/reduce
的聚合能力,但它会发出中间结果。
// .reduce((l, r) => (l._1, l._2 + r._2))
.scan("" → 0)((l, r) => (l._1, l._2 + r._2))
此外,如果您想将输出发送到日志 Sink
,您可以查看 alsoTo
,这将有效地执行广播到选择的一方 Sink
.