Akka Stream 的 Keep right/left/both 如何导致不同的输出?
How does Akka Stream's Keep right/left/both result in a different output?
我正在努力了解 Keep 在 Akka 流中的工作原理。阅读 中的答案,我了解到这有助于控制我们从实体化器的 left/right/both 端获取结果。但是,如果我可以更改 left/right 的值并获得不同的结果,我仍然无法构建示例。
例如,
implicit val system: ActorSystem = ActorSystem("Playground")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val sentenceSource = Source(List(
"Materialized values are confusing me",
"I love streams",
"Left foo right bar"
))
val wordCounter = Flow[String].fold[Int](0)((currentWords, newSentence) => currentWords + newSentence.split(" ").length)
val result = sentenceSource.viaMat(wordCounter)(Keep.left).toMat(Sink.head)(Keep.right).run()
val res = Await.result(result, 2 second)
println(res)
在此示例中,如果我将值从保持左更改为保持右,我仍然会得到相同的结果。有人可以为我提供一个基本示例,其中将 keep 更改为 left/right/both 值会导致不同的结果吗?
在您的示例中,因为:
sentenceSource: akka.stream.scaladsl.Source[String,akka.NotUsed] = ???
wordCounter: akka.stream.scaladsl.Flow[String,Int,akka.NotUsed] = ???
都有NotUsed
作为它们的物化(说明它们没有有用的物化),
sentenceSource.viaMat(wordCounter)(Keep.right)
sentenceSource.viaMat(wordCounter)(Keep.left)
具有相同的具体化。但是,由于 Sink.head[T]
实现为 Future[T]
,更改组合器显然会产生影响
val intSource = sentenceSource.viaMat(wordCounter)(Keep.right)
val notUsed = intSource.toMat(Sink.head)(Keep.left)
// akka.stream.scaladsl.RunnableGraph[akka.NotUsed]
val intFut = intSource.toMat(Sink.head)(Keep.right)
// akka.stream.scaladsl.RunnableGraph[scala.concurrent.Future[Int]]
notUsed.run // akka.NotUsed
intFut.run // Future(Success(12))
Source
中的大多数来源具体化为 NotUsed
并且几乎所有常见的 Flow
运算符也是如此,因此 toMat(someSink)(Keep.right)
(或等效的 .runWith(someSink)
) 比使用 Keep.left
或 Keep.both
更普遍。 source/flow 物化最常见的用例是提供某种控制平面,例如:
import akka.Done
import akka.stream.{ CompletionStrategy, OverflowStrategy }
import system.dispatcher
val completionMatcher: PartialFunction[Any, CompletionStrategy] = { case Done => CompletionStrategy.draining }
val failureMatcher: PartialFunction[Any, Throwable] = { case 666 => new Exception("""\m/""") }
val sentenceSource = Source.actorRef[String](completionMatcher = completionMatcher, failureMatcher = failureMatcher, bufferSize = 100, overflowStrategy = OverflowStrategy.dropNew)
// same wordCounter as before
val stream = sentenceSource.viaMat(wordCounter)(Keep.left).toMat(Sink.head)(Keep.both) // akka.stream.scaladsl.RunnableGraph[(akka.actor.ActorRef, scala.concurrent.Future[Int])]
val (sourceRef, intFut) = stream.run()
sourceRef ! "Materialized values are confusing me"
sourceRef ! "I love streams"
sourceRef ! "Left foo right bar"
sourceRef ! Done
intFut.foreach { result =>
println(result)
system.terminate()
}
在这种情况下,我们使用 Keep.left
传递 sentenceSource
的物化值,然后 Keep.both
获得物化值和 Sink.head
的物化值。
我正在努力了解 Keep 在 Akka 流中的工作原理。阅读
例如,
implicit val system: ActorSystem = ActorSystem("Playground")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val sentenceSource = Source(List(
"Materialized values are confusing me",
"I love streams",
"Left foo right bar"
))
val wordCounter = Flow[String].fold[Int](0)((currentWords, newSentence) => currentWords + newSentence.split(" ").length)
val result = sentenceSource.viaMat(wordCounter)(Keep.left).toMat(Sink.head)(Keep.right).run()
val res = Await.result(result, 2 second)
println(res)
在此示例中,如果我将值从保持左更改为保持右,我仍然会得到相同的结果。有人可以为我提供一个基本示例,其中将 keep 更改为 left/right/both 值会导致不同的结果吗?
在您的示例中,因为:
sentenceSource: akka.stream.scaladsl.Source[String,akka.NotUsed] = ???
wordCounter: akka.stream.scaladsl.Flow[String,Int,akka.NotUsed] = ???
都有NotUsed
作为它们的物化(说明它们没有有用的物化),
sentenceSource.viaMat(wordCounter)(Keep.right)
sentenceSource.viaMat(wordCounter)(Keep.left)
具有相同的具体化。但是,由于 Sink.head[T]
实现为 Future[T]
,更改组合器显然会产生影响
val intSource = sentenceSource.viaMat(wordCounter)(Keep.right)
val notUsed = intSource.toMat(Sink.head)(Keep.left)
// akka.stream.scaladsl.RunnableGraph[akka.NotUsed]
val intFut = intSource.toMat(Sink.head)(Keep.right)
// akka.stream.scaladsl.RunnableGraph[scala.concurrent.Future[Int]]
notUsed.run // akka.NotUsed
intFut.run // Future(Success(12))
Source
中的大多数来源具体化为 NotUsed
并且几乎所有常见的 Flow
运算符也是如此,因此 toMat(someSink)(Keep.right)
(或等效的 .runWith(someSink)
) 比使用 Keep.left
或 Keep.both
更普遍。 source/flow 物化最常见的用例是提供某种控制平面,例如:
import akka.Done
import akka.stream.{ CompletionStrategy, OverflowStrategy }
import system.dispatcher
val completionMatcher: PartialFunction[Any, CompletionStrategy] = { case Done => CompletionStrategy.draining }
val failureMatcher: PartialFunction[Any, Throwable] = { case 666 => new Exception("""\m/""") }
val sentenceSource = Source.actorRef[String](completionMatcher = completionMatcher, failureMatcher = failureMatcher, bufferSize = 100, overflowStrategy = OverflowStrategy.dropNew)
// same wordCounter as before
val stream = sentenceSource.viaMat(wordCounter)(Keep.left).toMat(Sink.head)(Keep.both) // akka.stream.scaladsl.RunnableGraph[(akka.actor.ActorRef, scala.concurrent.Future[Int])]
val (sourceRef, intFut) = stream.run()
sourceRef ! "Materialized values are confusing me"
sourceRef ! "I love streams"
sourceRef ! "Left foo right bar"
sourceRef ! Done
intFut.foreach { result =>
println(result)
system.terminate()
}
在这种情况下,我们使用 Keep.left
传递 sentenceSource
的物化值,然后 Keep.both
获得物化值和 Sink.head
的物化值。