Akka Stream 的 Keep right/left/both 如何导致不同的输出?

How does Akka Stream's Keep right/left/both result in a different output?

我正在努力了解 Keep 在 Akka 流中的工作原理。阅读 中的答案,我了解到这有助于控制我们从实体化器的 left/right/both 端获取结果。但是,如果我可以更改 left/right 的值并获得不同的结果,我仍然无法构建示例。

例如,

implicit val system: ActorSystem = ActorSystem("Playground")
implicit val materializer: ActorMaterializer = ActorMaterializer()

val sentenceSource = Source(List(
  "Materialized values are confusing me",
  "I love streams",
  "Left foo right bar"
))

val wordCounter = Flow[String].fold[Int](0)((currentWords, newSentence) => currentWords + newSentence.split(" ").length)
val result = sentenceSource.viaMat(wordCounter)(Keep.left).toMat(Sink.head)(Keep.right).run()

val res = Await.result(result, 2 second)
println(res)

在此示例中,如果我将值从保持左更改为保持右,我仍然会得到相同的结果。有人可以为我提供一个基本示例,其中将 keep 更改为 left/right/both 值会导致不同的结果吗?

在您的示例中,因为:

sentenceSource: akka.stream.scaladsl.Source[String,akka.NotUsed] = ???
wordCounter: akka.stream.scaladsl.Flow[String,Int,akka.NotUsed] = ???

都有NotUsed作为它们的物化(说明它们没有有用的物化),

sentenceSource.viaMat(wordCounter)(Keep.right)
sentenceSource.viaMat(wordCounter)(Keep.left)

具有相同的具体化。但是,由于 Sink.head[T] 实现为 Future[T],更改组合器显然会产生影响

val intSource = sentenceSource.viaMat(wordCounter)(Keep.right)

val notUsed = intSource.toMat(Sink.head)(Keep.left)
// akka.stream.scaladsl.RunnableGraph[akka.NotUsed]

val intFut = intSource.toMat(Sink.head)(Keep.right)
// akka.stream.scaladsl.RunnableGraph[scala.concurrent.Future[Int]]

notUsed.run    // akka.NotUsed

intFut.run     // Future(Success(12))

Source 中的大多数来源具体化为 NotUsed 并且几乎所有常见的 Flow 运算符也是如此,因此 toMat(someSink)(Keep.right)(或等效的 .runWith(someSink)) 比使用 Keep.leftKeep.both 更普遍。 source/flow 物化最常见的用例是提供某种控制平面,例如:

import akka.Done
import akka.stream.{ CompletionStrategy, OverflowStrategy }

import system.dispatcher

val completionMatcher: PartialFunction[Any, CompletionStrategy] = { case Done => CompletionStrategy.draining }
val failureMatcher: PartialFunction[Any, Throwable] = { case 666 => new Exception("""\m/""") }

val sentenceSource = Source.actorRef[String](completionMatcher = completionMatcher, failureMatcher = failureMatcher, bufferSize = 100, overflowStrategy = OverflowStrategy.dropNew)

// same wordCounter as before
val stream = sentenceSource.viaMat(wordCounter)(Keep.left).toMat(Sink.head)(Keep.both)    // akka.stream.scaladsl.RunnableGraph[(akka.actor.ActorRef, scala.concurrent.Future[Int])]

val (sourceRef, intFut) = stream.run()

sourceRef ! "Materialized values are confusing me"
sourceRef ! "I love streams"
sourceRef ! "Left foo right bar"
sourceRef ! Done

intFut.foreach { result =>
  println(result)
  system.terminate()
}

在这种情况下,我们使用 Keep.left 传递 sentenceSource 的物化值,然后 Keep.both 获得物化值和 Sink.head 的物化值。