Akka Streams 中 Balance 和 Broadcast fan out 的区别
Difference between Balance and Broadcast fan out in Akka Streams
我对 Akka streams
中的扇出策略有点困惑,我读到了
Broadcast
– (1 input, N outputs) 给定输入元素发射到每个输出,而 Balance
– (1 input, N outputs) 给定输入元素发射到其输出端口之一。
你能解释一下吗:
- 平衡如何与多个消费者一起使用?
- 短语的含义 "emits to one of its output ports"
- 下游端口是否相同?
- 是否'Balance'表示将输入流复制到几个输出分区
- "balance is enabling graphs to be split apart and multiple instances of downstream subscribers replicated to handle the volume" 是什么意思?
来自文档...广播将元素发射(发送)给每个消费者。 balance 只发送给第一个可用的消费者。
Emit each incoming element each of n outputs.
Fan-out the stream to several streams. Each upstream element is
emitted to the first available downstream consumer.
根据评论编辑:
根据您的要点,您应该制作两个 averageCarrierDelay 函数,每个函数对应 Z
和 F
。然后就可以看到发送给每个的所有元素了。
val averageCarrierDelayZ =
Flow[FlightDelayRecord]
.groupBy(30, _.uniqueCarrier)
.fold(("", 0, 0)){
(x: (String, Int, Int), y:FlightDelayRecord) => {
println(s"Z Received Element: ${y}")
val count = x._2 + 1
val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
(y.uniqueCarrier, count, totalMins)
}
}.mergeSubstreams
val averageCarrierDelayF =
Flow[FlightDelayRecord]
.groupBy(30, _.uniqueCarrier)
.fold(("", 0, 0)){
(x: (String, Int, Int), y:FlightDelayRecord) => {
println(s"F Received Element: ${y}")
val count = x._2 + 1
val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
(y.uniqueCarrier, count, totalMins)
}
}.mergeSubstreams
编辑 2:为了在将来进行检查,我建议为流阶段使用一个通用记录器,这样您就可以看到发生了什么。
def logElement[A](msg: String) = Flow[A].map { a => println(s"${msg} ${a}"); a }
这样做可以让您做类似的事情:
D ~> logElement[FlightDelayRecord]("F received: ") ~> F
D ~> logElement[FlightDelayRecord]("Z received: ") ~> Z
通过这种方式,您可以检查图表区域是否存在您可能预料到或可能不预料到的奇怪行为。
正如其他人已经说过的,broadcast 将其输入发送到 all 个输出端口,而 balance 将其输入发送到 one 个输出端口关于背压。
当您使用GraphStage
时,您需要选择您要使用的输出端口。考虑这个例子:
val q1 = Source.queue[Int](10, OverflowStrategy.fail)
val q2 = Source.queue[Int](10, OverflowStrategy.fail)
GraphDSL.create(q1, q2)(Keep.both) { implicit b => (input1, input2) =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Int](2))
val balance = b.add(Balance[Int](2))
val consumer1, consumer2, consumer3, consumer4 = b.add(Sink.foreach[Int](println))
input1 ~> broadcast.in
input2 ~> balance.in
broadcast.out(0) ~> consumer1
broadcast.out(1) ~> consumer2
balance.out(0) ~> consumer3
balance.out(1) ~> consumer4
ClosedShape
}
在这里,我们将一个输入连接到广播级,将一个输入连接到平衡级。然后我们将广播和平衡阶段的不同输出端口连接到各自的消费者。
在这种特殊情况下,当您 运行 流时,来自第一个输入的元素将同时传递给 consumer1
和 consumer2
,因为广播阶段会复制其输入到它的所有输出(这里有两个输出),通过第二个输入的元素将根据终端的速度(即 [=16 的速度)均匀分布在 consumer3
和 consumer4
=]), 因为Sink.foreach
函数执行时间长了会背压
请注意,我们已指定广播和平衡阶段各有 2 个端口(在调用它们的工厂方法时),并且我们已指定连接到哪个消费者的输出端口。
我对 Akka streams
中的扇出策略有点困惑,我读到了
Broadcast
– (1 input, N outputs) 给定输入元素发射到每个输出,而 Balance
– (1 input, N outputs) 给定输入元素发射到其输出端口之一。
你能解释一下吗:
- 平衡如何与多个消费者一起使用?
- 短语的含义 "emits to one of its output ports"
- 下游端口是否相同?
- 是否'Balance'表示将输入流复制到几个输出分区
- "balance is enabling graphs to be split apart and multiple instances of downstream subscribers replicated to handle the volume" 是什么意思?
来自文档...广播将元素发射(发送)给每个消费者。 balance 只发送给第一个可用的消费者。
Emit each incoming element each of n outputs.
Fan-out the stream to several streams. Each upstream element is emitted to the first available downstream consumer.
根据评论编辑:
根据您的要点,您应该制作两个 averageCarrierDelay 函数,每个函数对应 Z
和 F
。然后就可以看到发送给每个的所有元素了。
val averageCarrierDelayZ =
Flow[FlightDelayRecord]
.groupBy(30, _.uniqueCarrier)
.fold(("", 0, 0)){
(x: (String, Int, Int), y:FlightDelayRecord) => {
println(s"Z Received Element: ${y}")
val count = x._2 + 1
val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
(y.uniqueCarrier, count, totalMins)
}
}.mergeSubstreams
val averageCarrierDelayF =
Flow[FlightDelayRecord]
.groupBy(30, _.uniqueCarrier)
.fold(("", 0, 0)){
(x: (String, Int, Int), y:FlightDelayRecord) => {
println(s"F Received Element: ${y}")
val count = x._2 + 1
val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
(y.uniqueCarrier, count, totalMins)
}
}.mergeSubstreams
编辑 2:为了在将来进行检查,我建议为流阶段使用一个通用记录器,这样您就可以看到发生了什么。
def logElement[A](msg: String) = Flow[A].map { a => println(s"${msg} ${a}"); a }
这样做可以让您做类似的事情:
D ~> logElement[FlightDelayRecord]("F received: ") ~> F
D ~> logElement[FlightDelayRecord]("Z received: ") ~> Z
通过这种方式,您可以检查图表区域是否存在您可能预料到或可能不预料到的奇怪行为。
正如其他人已经说过的,broadcast 将其输入发送到 all 个输出端口,而 balance 将其输入发送到 one 个输出端口关于背压。
当您使用GraphStage
时,您需要选择您要使用的输出端口。考虑这个例子:
val q1 = Source.queue[Int](10, OverflowStrategy.fail)
val q2 = Source.queue[Int](10, OverflowStrategy.fail)
GraphDSL.create(q1, q2)(Keep.both) { implicit b => (input1, input2) =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Int](2))
val balance = b.add(Balance[Int](2))
val consumer1, consumer2, consumer3, consumer4 = b.add(Sink.foreach[Int](println))
input1 ~> broadcast.in
input2 ~> balance.in
broadcast.out(0) ~> consumer1
broadcast.out(1) ~> consumer2
balance.out(0) ~> consumer3
balance.out(1) ~> consumer4
ClosedShape
}
在这里,我们将一个输入连接到广播级,将一个输入连接到平衡级。然后我们将广播和平衡阶段的不同输出端口连接到各自的消费者。
在这种特殊情况下,当您 运行 流时,来自第一个输入的元素将同时传递给 consumer1
和 consumer2
,因为广播阶段会复制其输入到它的所有输出(这里有两个输出),通过第二个输入的元素将根据终端的速度(即 [=16 的速度)均匀分布在 consumer3
和 consumer4
=]), 因为Sink.foreach
函数执行时间长了会背压
请注意,我们已指定广播和平衡阶段各有 2 个端口(在调用它们的工厂方法时),并且我们已指定连接到哪个消费者的输出端口。