广播不会输出 Akka Streams
Broadcast not going to outputs Akka Streams
我目前正在尝试在 Scala 中使用 Akka Streams 和管道过滤器架构制作程序。我有一个特定的图表,它应该接受一个输入并将其输出到多个流。最后,所有不同流程的结果应该合并为一个。在我的例子中,输入将是各种推文。然后这些推文首先进入不同的过滤器,所有过滤器都过滤一种类型,然后进行扫描,简单地计算它看到的某种类型的数量。在此之后,我希望输出是这些扫描的 return 值并将其组合成一个元组。
现在,我为此设置了一个特定的图形 DSL,它使用 Broadcast 和 ZipWith 来执行此操作。我的代码如下:
val splitStreams =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val bcastTweets = builder.add(Broadcast[Tweet](4))
val zipTweets = builder.add(ZipWith[Int, Int, Int, Int, (Int, Int, Int, Int)]((a, b, c, d) => (a, b, c, d)))
bcastTweets.out(0) ~> retweetFlow ~> retweetCount ~> zipTweets.in0
bcastTweets.out(1) ~> replyFlow ~> replyCount ~> zipTweets.in1
bcastTweets.out(2) ~> quotedFlow ~> quotedCount ~> zipTweets.in2
bcastTweets.out(3) ~> normalFlow ~> normalCount ~> zipTweets.in3
FlowShape(bcastTweets.in, zipTweets.out)
})
问题是,当我测试这段代码时,广播似乎没有进入任何一个流程。
任何人都可以告诉我我做错了什么,我已经看了大约 2 天,但无法弄清楚。
所描述的问题与 ZipWith
(和 Zip
)无法使用过滤后的形状作为其输入有关。我的猜测是 Akka Stream 不知道如何正确压缩单独过滤的 Shapes 的元素。显然,如果所涉及的流是使用 map
.
的普通映射,ZipWith
/Zip
将起作用
您需要的一种变通方法是将 ZipWith
替换为 Merge
以及 grouped
,如以下带有多个虚拟过滤流的简单示例所示:
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream._
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer() // Not needed for Akka Stream 2.6+
implicit val ec = system.dispatcher
val n = 4
def filterFlow(i: Int) = Flow[Int].filter(_ % n == i)
val customFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val bcast = builder.add(Broadcast[Int](n))
val merger = builder.add(Merge[Int](n))
(0 until n).foreach{ i =>
bcast.out(i) ~> filterFlow(i) ~> merger.in(i)
}
FlowShape(bcast.in, merger.out)
})
Source(0 to 9).via(customFlow).grouped(n).runForeach(println)
// Output:
// Vector(0, 1, 2, 3)
// Vector(4, 5, 6, 7)
// Vector(8, 9)
如果输出需要是元组,只需像下面那样应用 collect
(例如对于 n = 4):
val empty = -1 // Default place-holder value
Source(0 to 9).via(customFlow).grouped(n).collect{
case Vector(a) => (a, empty, empty, empty)
case Vector(a, b) => (a, b, empty, empty)
case Vector(a, b, c) => (a, b, c, empty)
case Vector(a, b, c, d) => (a, b, c, d)
}.runForeach(println)
// Output:
// (0, 1, 2, 3)
// (4, 5, 6, 7)
// (8, 9, -1, -1)
这里是一些关于正在发生的事情的背景:
Zip
要求每个上游都有一个元素被压缩成一个元组(如果它还没有看到一个位置,它不能弥补一个位置的值)并且不会要求更多上游的元素,直到它压缩了一个元组并发送到下游。
另一方面,Broadcast
只有在看到所有下游的需求时才能发出,以便它可以安全地向所有下游发出一个元素。因此,如果广播和 zip 之间的流之一丢失任何元素,您最终会遇到卡住的流 - zip 无法要求更多,广播无法向所有人发送。
您可以通过在每个广播流中添加分离或缓冲区作为第一个运算符来摆脱这种僵局。你必须仔细考虑是否是你想要实现的目标。
Merge
并且只会发出从任何上游到下游的单个元素,并且 MergeLatest
会在它看到所有输入上的至少一个输入时立即发出(这意味着如果第一个元素在其中一个输入上被过滤,它也可能会死锁)然后也可能重复值,所以这两者都与压缩有很大不同。
我目前正在尝试在 Scala 中使用 Akka Streams 和管道过滤器架构制作程序。我有一个特定的图表,它应该接受一个输入并将其输出到多个流。最后,所有不同流程的结果应该合并为一个。在我的例子中,输入将是各种推文。然后这些推文首先进入不同的过滤器,所有过滤器都过滤一种类型,然后进行扫描,简单地计算它看到的某种类型的数量。在此之后,我希望输出是这些扫描的 return 值并将其组合成一个元组。
现在,我为此设置了一个特定的图形 DSL,它使用 Broadcast 和 ZipWith 来执行此操作。我的代码如下:
val splitStreams =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val bcastTweets = builder.add(Broadcast[Tweet](4))
val zipTweets = builder.add(ZipWith[Int, Int, Int, Int, (Int, Int, Int, Int)]((a, b, c, d) => (a, b, c, d)))
bcastTweets.out(0) ~> retweetFlow ~> retweetCount ~> zipTweets.in0
bcastTweets.out(1) ~> replyFlow ~> replyCount ~> zipTweets.in1
bcastTweets.out(2) ~> quotedFlow ~> quotedCount ~> zipTweets.in2
bcastTweets.out(3) ~> normalFlow ~> normalCount ~> zipTweets.in3
FlowShape(bcastTweets.in, zipTweets.out)
})
问题是,当我测试这段代码时,广播似乎没有进入任何一个流程。
任何人都可以告诉我我做错了什么,我已经看了大约 2 天,但无法弄清楚。
所描述的问题与 ZipWith
(和 Zip
)无法使用过滤后的形状作为其输入有关。我的猜测是 Akka Stream 不知道如何正确压缩单独过滤的 Shapes 的元素。显然,如果所涉及的流是使用 map
.
ZipWith
/Zip
将起作用
您需要的一种变通方法是将 ZipWith
替换为 Merge
以及 grouped
,如以下带有多个虚拟过滤流的简单示例所示:
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream._
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer() // Not needed for Akka Stream 2.6+
implicit val ec = system.dispatcher
val n = 4
def filterFlow(i: Int) = Flow[Int].filter(_ % n == i)
val customFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val bcast = builder.add(Broadcast[Int](n))
val merger = builder.add(Merge[Int](n))
(0 until n).foreach{ i =>
bcast.out(i) ~> filterFlow(i) ~> merger.in(i)
}
FlowShape(bcast.in, merger.out)
})
Source(0 to 9).via(customFlow).grouped(n).runForeach(println)
// Output:
// Vector(0, 1, 2, 3)
// Vector(4, 5, 6, 7)
// Vector(8, 9)
如果输出需要是元组,只需像下面那样应用 collect
(例如对于 n = 4):
val empty = -1 // Default place-holder value
Source(0 to 9).via(customFlow).grouped(n).collect{
case Vector(a) => (a, empty, empty, empty)
case Vector(a, b) => (a, b, empty, empty)
case Vector(a, b, c) => (a, b, c, empty)
case Vector(a, b, c, d) => (a, b, c, d)
}.runForeach(println)
// Output:
// (0, 1, 2, 3)
// (4, 5, 6, 7)
// (8, 9, -1, -1)
这里是一些关于正在发生的事情的背景:
Zip
要求每个上游都有一个元素被压缩成一个元组(如果它还没有看到一个位置,它不能弥补一个位置的值)并且不会要求更多上游的元素,直到它压缩了一个元组并发送到下游。
Broadcast
只有在看到所有下游的需求时才能发出,以便它可以安全地向所有下游发出一个元素。因此,如果广播和 zip 之间的流之一丢失任何元素,您最终会遇到卡住的流 - zip 无法要求更多,广播无法向所有人发送。
您可以通过在每个广播流中添加分离或缓冲区作为第一个运算符来摆脱这种僵局。你必须仔细考虑是否是你想要实现的目标。
Merge
并且只会发出从任何上游到下游的单个元素,并且 MergeLatest
会在它看到所有输入上的至少一个输入时立即发出(这意味着如果第一个元素在其中一个输入上被过滤,它也可能会死锁)然后也可能重复值,所以这两者都与压缩有很大不同。