如何限制 Akka Stream 每秒只执行一次并发送一条消息?
How to limit an Akka Stream to execute and send down one message only once per second?
我有一个 Akka 流,我希望流大约每秒向下游发送消息。
我尝试了两种方法来解决这个问题,第一种方法是让生产者在流开始时每秒仅在 Continue 消息进入此 actor 时发送一次消息。
// When receive a Continue message in a ActorPublisher
// do work then...
if (totalDemand > 0) {
import scala.concurrent.duration._
context.system.scheduler.scheduleOnce(1 second, self, Continue)
}
这工作了一小段时间,然后大量的 Continue 消息出现在 ActorPublisher actor 中,我假设(猜测但不确定)来自下游通过背压请求消息,因为下游可以快速消耗但上游不是以快速的速度生产。所以这个方法失败了。
我尝试的另一种方法是通过背压控制,我在流末尾的 ActorSubscriber
上使用 MaxInFlightRequestStrategy
将消息数限制为每秒 1 条。这行得通,但每次传入的消息大约为三条左右,而不是一次一条。似乎背压控制不会立即改变消息进入的速率,或者消息已经在流中排队等待处理。
所以问题是,我怎样才能拥有每秒只能处理一条消息的 Akka Stream?
我发现 MaxInFlightRequestStrategy
是一种有效的方法,但我应该将批量大小设置为 1,它的批量大小默认为 5,这导致了我发现的问题。现在我正在查看提交的答案,这也是解决问题的一种过于复杂的方法。
您可以让您的元素通过节流,这将背压一个快速源,或者您可以结合使用 tick
和 zip
。
第一个解决方案是这样的:
val veryFastSource =
Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000))
val throttlingFlow = Flow[Long].throttle(
// how many elements do you allow
elements = 1,
// in what unit of time
per = 1.second,
maximumBurst = 0,
// you can also set this to Enforcing, but then your
// stream will collapse if exceeding the number of elements / s
mode = ThrottleMode.Shaping
)
veryFastSource.via(throttlingFlow).runWith(Sink.foreach(println))
第二个解决方案是这样的:
val veryFastSource =
Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000))
val tickingSource = Source.tick(1.second, 1.second, 0)
veryFastSource.zip(tickingSource).map(_._1).runWith(Sink.foreach(println))
我有一个 Akka 流,我希望流大约每秒向下游发送消息。
我尝试了两种方法来解决这个问题,第一种方法是让生产者在流开始时每秒仅在 Continue 消息进入此 actor 时发送一次消息。
// When receive a Continue message in a ActorPublisher
// do work then...
if (totalDemand > 0) {
import scala.concurrent.duration._
context.system.scheduler.scheduleOnce(1 second, self, Continue)
}
这工作了一小段时间,然后大量的 Continue 消息出现在 ActorPublisher actor 中,我假设(猜测但不确定)来自下游通过背压请求消息,因为下游可以快速消耗但上游不是以快速的速度生产。所以这个方法失败了。
我尝试的另一种方法是通过背压控制,我在流末尾的 ActorSubscriber
上使用 MaxInFlightRequestStrategy
将消息数限制为每秒 1 条。这行得通,但每次传入的消息大约为三条左右,而不是一次一条。似乎背压控制不会立即改变消息进入的速率,或者消息已经在流中排队等待处理。
所以问题是,我怎样才能拥有每秒只能处理一条消息的 Akka Stream?
我发现 MaxInFlightRequestStrategy
是一种有效的方法,但我应该将批量大小设置为 1,它的批量大小默认为 5,这导致了我发现的问题。现在我正在查看提交的答案,这也是解决问题的一种过于复杂的方法。
您可以让您的元素通过节流,这将背压一个快速源,或者您可以结合使用 tick
和 zip
。
第一个解决方案是这样的:
val veryFastSource =
Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000))
val throttlingFlow = Flow[Long].throttle(
// how many elements do you allow
elements = 1,
// in what unit of time
per = 1.second,
maximumBurst = 0,
// you can also set this to Enforcing, but then your
// stream will collapse if exceeding the number of elements / s
mode = ThrottleMode.Shaping
)
veryFastSource.via(throttlingFlow).runWith(Sink.foreach(println))
第二个解决方案是这样的:
val veryFastSource =
Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000))
val tickingSource = Source.tick(1.second, 1.second, 0)
veryFastSource.zip(tickingSource).map(_._1).runWith(Sink.foreach(println))