这个 Akka Kafka Stream 配置是否受益于 Akka Streams 的背压机制?
Is this Akka Kafka Stream configuration benefits from Back Pressure mechanism of the Akka Streams?
我们有一个 Akka 应用程序,它使用 Kafka 主题并将接收到的消息发送给 Akka Actor。我不确定我的编程方式是否利用了 Akka Streams 中内置的背压机制的所有优势。
以下是我的配置...
val control : Consumer.DrainingControl[Done]
Consumer
.sourceWitOffsetContext(consumerSettings, Subscriptions.topics("myTopic"))
.map(consumerRecord =>
val myAvro = consumerRecord.value().asInstanceOf[MyAvro]
val myActor = AkkaSystem.sharding.getMyActor(myAvro.getId)
myActor ! Update(myAvro)
)
.via(Commiter.flowWithOffsetContext(CommitterSettings(AkkaSystem.system.toClassic)))
.toMat(Sink.ignore)(Consumer.DrainControl.apply)
.run()
这符合我的业务案例预期,myActor 收到命令更新 (MyAvro)
我对 Back Pressure 的技术概念更恼火,据我所知,Back Pressure 机制部分由 Sinks 控制,但在这种 Stream 配置中,我的 Sink 只有 'Sink.ignore'。所以我的水槽正在为背压做任何事情。
我也很好奇Akka Kafka Stream提交Kafka Topic偏移量的时候?命令发送到 MyActor 邮箱的那一刻?如果是这样,那么我如何处理询问模式等场景,Kafka Offset 在询问模式完成之前不应提交。
我看到一些处理手动偏移控制的工厂方法 'plainPartitionedManualOffsetSource','commitablePartitionManualOffsetSource' 但我找不到任何示例,我可以根据我的业务逻辑决定手动提交偏移吗?
作为替代配置,我可以使用类似的东西。
val myActor: ActorRef[MyActor.Command] = AkkaSystem.sharding.getMyActor
val (control, result) =
Consumer
.plainSource(consumerSettings, Subscriptions.topics("myTopic"))
.toMat(Sink.actorRef(AkkaSystem.sharding.getMyActor(myAvro.getId), null))(Keep.both)
.run()
现在我可以访问Sink.actorRef,我认为Back Pressure机制有机会控制Back Pressure,自然这段代码不会起作用,因为我不知道如何访问'myAvro'这个星座。
感谢解答..
在第一个流中,基本不会有背压。偏移量提交将在消息发送到 myActor
.
后很快发生
对于背压,您需要等待目标参与者的响应,正如您所说,询问模式是实现这一目标的规范方式。由于来自 actor 外部的 actor 的请求(出于所有意图和目的,流在 actor 外部:阶段由 actor 执行是一个实现细节)导致 Future
,这表明 mapAsync
被要求。
def askUpdate(m: MyAvro): Future[Response] = ??? // get actorref from cluster sharding, send ask, etc.
然后您可以将原始流中的 map
替换为
.mapAsync(parallelism) { consumerRecord =>
askUpdate(consumeRecord.value().asInstanceOf[MyAvro])
}
mapAsync
将“飞行”期货限制为 parallelism
。如果有 parallelism
个期货(当然是由它产生的),它将背压。如果生成的未来以失败告终(对于请求本身,这通常是超时),它将失败;成功期货的结果(关于传入订单)将被传递(通常,这些将是 akka.Done
,尤其是当流中唯一剩下要做的事情是偏移提交和 Sink.ignore
时)。
此说法不正确:
... as much as I can understand, Back Pressure mechanism are controlled partially from Sinks but in this Stream configuration, my Sink is only 'Sink.ignore'. So my Sink is doing anything for Back Pressure.
Sink
s 的背压没有什么特别之处。背压作为流量控制机制将自动在流中存在异步边界的任何地方使用。这可能在 Sink
中,但也可能在流中的其他任何地方。
在你的情况下,你正在连接你的流与演员交谈。那是你的异步边界,但你这样做的方式是使用 map
并且在该地图中你使用 !
与演员交谈。所以没有背压,因为:
map
不是异步运算符,它内部调用的任何内容都不能参与背压机制。所以从 Akka Stream 的角度来看,没有引入异步边界。
!
是一劳永逸,没有提供关于 actor 执行任何背压有多忙的反馈。
就像 Levi 提到的那样,您可以做的是从 tell
更改为 ask
交互,并让接收 actor 在其工作完成时做出响应。然后你可以像 Levi 描述的那样使用 mapAsync
。 map
和 mapAsync
之间的区别在于 mapAsync
的语义是这样的,它只会在返回的 Future
完成时向下游发出。即使 parallelism
为 1,背压仍然有效。如果你的 Kafka 记录来得比你的 actor 可以处理的快得多,mapAsync
会在等待 Future
完成时对上游进行背压。 在这种特殊情况下,我认为增加 parallelism
没有任何意义,因为所有这些消息都将添加到演员的收件箱中,因此这样做不会真正加快任何速度。如果交互是 REST 调用,那么它可以提高整体吞吐量 。根据您的参与者处理消息的方式,为 mapAsync
增加 parallelism
可能会导致吞吐量增加。 paralleslism
值有效地限制了反压开始前允许的最大未完成 Future
秒数。
我们有一个 Akka 应用程序,它使用 Kafka 主题并将接收到的消息发送给 Akka Actor。我不确定我的编程方式是否利用了 Akka Streams 中内置的背压机制的所有优势。
以下是我的配置...
val control : Consumer.DrainingControl[Done]
Consumer
.sourceWitOffsetContext(consumerSettings, Subscriptions.topics("myTopic"))
.map(consumerRecord =>
val myAvro = consumerRecord.value().asInstanceOf[MyAvro]
val myActor = AkkaSystem.sharding.getMyActor(myAvro.getId)
myActor ! Update(myAvro)
)
.via(Commiter.flowWithOffsetContext(CommitterSettings(AkkaSystem.system.toClassic)))
.toMat(Sink.ignore)(Consumer.DrainControl.apply)
.run()
这符合我的业务案例预期,myActor 收到命令更新 (MyAvro)
我对 Back Pressure 的技术概念更恼火,据我所知,Back Pressure 机制部分由 Sinks 控制,但在这种 Stream 配置中,我的 Sink 只有 'Sink.ignore'。所以我的水槽正在为背压做任何事情。
我也很好奇Akka Kafka Stream提交Kafka Topic偏移量的时候?命令发送到 MyActor 邮箱的那一刻?如果是这样,那么我如何处理询问模式等场景,Kafka Offset 在询问模式完成之前不应提交。
我看到一些处理手动偏移控制的工厂方法 'plainPartitionedManualOffsetSource','commitablePartitionManualOffsetSource' 但我找不到任何示例,我可以根据我的业务逻辑决定手动提交偏移吗?
作为替代配置,我可以使用类似的东西。
val myActor: ActorRef[MyActor.Command] = AkkaSystem.sharding.getMyActor
val (control, result) =
Consumer
.plainSource(consumerSettings, Subscriptions.topics("myTopic"))
.toMat(Sink.actorRef(AkkaSystem.sharding.getMyActor(myAvro.getId), null))(Keep.both)
.run()
现在我可以访问Sink.actorRef,我认为Back Pressure机制有机会控制Back Pressure,自然这段代码不会起作用,因为我不知道如何访问'myAvro'这个星座。
感谢解答..
在第一个流中,基本不会有背压。偏移量提交将在消息发送到 myActor
.
对于背压,您需要等待目标参与者的响应,正如您所说,询问模式是实现这一目标的规范方式。由于来自 actor 外部的 actor 的请求(出于所有意图和目的,流在 actor 外部:阶段由 actor 执行是一个实现细节)导致 Future
,这表明 mapAsync
被要求。
def askUpdate(m: MyAvro): Future[Response] = ??? // get actorref from cluster sharding, send ask, etc.
然后您可以将原始流中的 map
替换为
.mapAsync(parallelism) { consumerRecord =>
askUpdate(consumeRecord.value().asInstanceOf[MyAvro])
}
mapAsync
将“飞行”期货限制为 parallelism
。如果有 parallelism
个期货(当然是由它产生的),它将背压。如果生成的未来以失败告终(对于请求本身,这通常是超时),它将失败;成功期货的结果(关于传入订单)将被传递(通常,这些将是 akka.Done
,尤其是当流中唯一剩下要做的事情是偏移提交和 Sink.ignore
时)。
此说法不正确:
... as much as I can understand, Back Pressure mechanism are controlled partially from Sinks but in this Stream configuration, my Sink is only 'Sink.ignore'. So my Sink is doing anything for Back Pressure.
Sink
s 的背压没有什么特别之处。背压作为流量控制机制将自动在流中存在异步边界的任何地方使用。这可能在 Sink
中,但也可能在流中的其他任何地方。
在你的情况下,你正在连接你的流与演员交谈。那是你的异步边界,但你这样做的方式是使用 map
并且在该地图中你使用 !
与演员交谈。所以没有背压,因为:
map
不是异步运算符,它内部调用的任何内容都不能参与背压机制。所以从 Akka Stream 的角度来看,没有引入异步边界。!
是一劳永逸,没有提供关于 actor 执行任何背压有多忙的反馈。
就像 Levi 提到的那样,您可以做的是从 tell
更改为 ask
交互,并让接收 actor 在其工作完成时做出响应。然后你可以像 Levi 描述的那样使用 mapAsync
。 map
和 mapAsync
之间的区别在于 mapAsync
的语义是这样的,它只会在返回的 Future
完成时向下游发出。即使 parallelism
为 1,背压仍然有效。如果你的 Kafka 记录来得比你的 actor 可以处理的快得多,mapAsync
会在等待 Future
完成时对上游进行背压。 在这种特殊情况下,我认为增加 。根据您的参与者处理消息的方式,为 parallelism
没有任何意义,因为所有这些消息都将添加到演员的收件箱中,因此这样做不会真正加快任何速度。如果交互是 REST 调用,那么它可以提高整体吞吐量 mapAsync
增加 parallelism
可能会导致吞吐量增加。 paralleslism
值有效地限制了反压开始前允许的最大未完成 Future
秒数。