停止时跳过一部分流
Skip a part of the stream when it's stalled
我有这样一种情况,消息通过一个可能延迟它们的组件传递。
在压力下我想跳过这个组件,这样最多可以同时延迟 X 条消息。溢出的消息将跳过此阶段并进入下一阶段。
消息在此阶段停止,直到它们的未来完成,或最多一分钟,以先到者为准。
我可能可以类似地实现自定义 GraphStage to this buffer example,或者使用 divertTo 和一些计数器来使消息跳过停滞的组件,
但感觉在 akka 流中可能有更简单的方法
我一直在研究你的用例,并提出了一个基于 Akka Actor
代表计数器和异步映射阶段的解决方案:
想法是在任何给定时间最多可以处理 3
个元素,并且基于最大容量为 2
的计数器,我们最多只允许 2
这些元素同时由慢速组件处理。
这样一来,总有一个处理线程保留给上游元素,这些元素将从慢组件中分支出来并直接到达下游。
我们先定义一个基本的Counter
,最大容量为Akka Actor
:
class Counter(max: Int) extends Actor {
private var count: Int = 0
override def receive: Receive = {
case TryAndLock if count < max =>
count += 1
sender ! true
case TryAndLock =>
sender ! false
case Release =>
count -= 1
}
}
sealed trait CounterAction
case object TryAndLock extends CounterAction
case object Release extends CounterAction
val counter = system.actorOf(Props(new Counter(max = 2)))
它包含一个可变的 count
变量,可以通过 TryAndLock
请求递增,但前提是计数尚未达到最大容量,并且可以通过 [=] 递减22=]请求。
我们正在使用 Actor
,以便在没有竞争条件的情况下正确处理来自以下 mapAsync
阶段的并发锁定和释放操作。
那么,只需要使用 mapAsyncUnordered
阶段,其并行度仅比计数器的最大容量高出 1 个单位。
任何通过异步阶段的元素都会查询 Counter
以尝试锁定资源。如果资源已被锁定,则该元素将被扔到慢速组件中。如果没有,它将跳过它。元素被传递到慢速组件,直到我们达到计数器的最大容量,此时任何新元素都将被跳过,直到元素退出慢速组件并从计数器释放资源。
我们不能简单地使用 mapAsync
因为元素在存在阶段时会保持其上游的顺序,使得跳过的元素等待慢速组件处理的元素在下游生产之前完成。因此有必要使用 mapAsyncUnordered
代替。
我们定义一个例子,慢速组件最多同时处理2个元素和并行度为3的异步映射:
Source(0 to 15)
.throttle(1, 50.milliseconds)
.mapAsyncUnordered(parallelism = 3) { i =>
(counter ? TryAndLock).map {
case locked: Boolean if locked =>
val result = slowTask(i)
counter ! Release
result
case _ =>
skipTask(i)
}
}
.runForeach(println)
例如这两个函数将模拟慢速组件 (slowTask
) 以及跳过慢速组件时要做什么 (skipTask
):
def slowTask(value: Int): String = {
val start = Instant.now()
Thread.sleep(250)
s"$value - processed - $start - ${Instant.now()}"
}
def skipTask(value: Int): String =
s"$value - skipped - ${Instant.now()}"
结果类似于:
2 - skipped - 2020-06-03T19:07:19.410Z
3 - skipped - 2020-06-03T19:07:19.468Z
4 - skipped - 2020-06-03T19:07:19.518Z
5 - skipped - 2020-06-03T19:07:19.569Z
1 - processed - 2020-06-03T19:07:19.356Z - 2020-06-03T19:07:19.611Z
0 - processed - 2020-06-03T19:07:19.356Z - 2020-06-03T19:07:19.611Z
8 - skipped - 2020-06-03T19:07:19.719Z
9 - skipped - 2020-06-03T19:07:19.769Z
10 - skipped - 2020-06-03T19:07:19.819Z
6 - processed - 2020-06-03T19:07:19.618Z - 2020-06-03T19:07:19.869Z
12 - skipped - 2020-06-03T19:07:19.919Z
7 - processed - 2020-06-03T19:07:19.669Z - 2020-06-03T19:07:19.921Z
14 - skipped - 2020-06-03T19:07:20.019Z
15 - skipped - 2020-06-03T19:07:20.070Z
11 - processed - 2020-06-03T19:07:19.869Z - 2020-06-03T19:07:20.122Z
13 - processed - 2020-06-03T19:07:19.968Z - 2020-06-03T19:07:20.219Z
其中第一部分是上游元素的索引,第二部分是应用元素的转换(进入慢速组件时 processed
或 skipped
)和最后一个部分是一个时间戳,以便我们可视化事情发生的时间。
进入阶段的前 2 个元素(0 和 1)由慢组件处理,随后的一堆元素(2、3、4 和 5)跳过慢阶段,直到这 2 个前元素完成并且附加元素可以进入慢速阶段。等等。
吉尔福伊尔
我有这样一种情况,消息通过一个可能延迟它们的组件传递。
在压力下我想跳过这个组件,这样最多可以同时延迟 X 条消息。溢出的消息将跳过此阶段并进入下一阶段。
消息在此阶段停止,直到它们的未来完成,或最多一分钟,以先到者为准。
我可能可以类似地实现自定义 GraphStage to this buffer example,或者使用 divertTo 和一些计数器来使消息跳过停滞的组件, 但感觉在 akka 流中可能有更简单的方法
我一直在研究你的用例,并提出了一个基于 Akka Actor
代表计数器和异步映射阶段的解决方案:
想法是在任何给定时间最多可以处理 3
个元素,并且基于最大容量为 2
的计数器,我们最多只允许 2
这些元素同时由慢速组件处理。
这样一来,总有一个处理线程保留给上游元素,这些元素将从慢组件中分支出来并直接到达下游。
我们先定义一个基本的Counter
,最大容量为Akka Actor
:
class Counter(max: Int) extends Actor {
private var count: Int = 0
override def receive: Receive = {
case TryAndLock if count < max =>
count += 1
sender ! true
case TryAndLock =>
sender ! false
case Release =>
count -= 1
}
}
sealed trait CounterAction
case object TryAndLock extends CounterAction
case object Release extends CounterAction
val counter = system.actorOf(Props(new Counter(max = 2)))
它包含一个可变的 count
变量,可以通过 TryAndLock
请求递增,但前提是计数尚未达到最大容量,并且可以通过 [=] 递减22=]请求。
我们正在使用 Actor
,以便在没有竞争条件的情况下正确处理来自以下 mapAsync
阶段的并发锁定和释放操作。
那么,只需要使用 mapAsyncUnordered
阶段,其并行度仅比计数器的最大容量高出 1 个单位。
任何通过异步阶段的元素都会查询 Counter
以尝试锁定资源。如果资源已被锁定,则该元素将被扔到慢速组件中。如果没有,它将跳过它。元素被传递到慢速组件,直到我们达到计数器的最大容量,此时任何新元素都将被跳过,直到元素退出慢速组件并从计数器释放资源。
我们不能简单地使用 mapAsync
因为元素在存在阶段时会保持其上游的顺序,使得跳过的元素等待慢速组件处理的元素在下游生产之前完成。因此有必要使用 mapAsyncUnordered
代替。
我们定义一个例子,慢速组件最多同时处理2个元素和并行度为3的异步映射:
Source(0 to 15)
.throttle(1, 50.milliseconds)
.mapAsyncUnordered(parallelism = 3) { i =>
(counter ? TryAndLock).map {
case locked: Boolean if locked =>
val result = slowTask(i)
counter ! Release
result
case _ =>
skipTask(i)
}
}
.runForeach(println)
例如这两个函数将模拟慢速组件 (slowTask
) 以及跳过慢速组件时要做什么 (skipTask
):
def slowTask(value: Int): String = {
val start = Instant.now()
Thread.sleep(250)
s"$value - processed - $start - ${Instant.now()}"
}
def skipTask(value: Int): String =
s"$value - skipped - ${Instant.now()}"
结果类似于:
2 - skipped - 2020-06-03T19:07:19.410Z
3 - skipped - 2020-06-03T19:07:19.468Z
4 - skipped - 2020-06-03T19:07:19.518Z
5 - skipped - 2020-06-03T19:07:19.569Z
1 - processed - 2020-06-03T19:07:19.356Z - 2020-06-03T19:07:19.611Z
0 - processed - 2020-06-03T19:07:19.356Z - 2020-06-03T19:07:19.611Z
8 - skipped - 2020-06-03T19:07:19.719Z
9 - skipped - 2020-06-03T19:07:19.769Z
10 - skipped - 2020-06-03T19:07:19.819Z
6 - processed - 2020-06-03T19:07:19.618Z - 2020-06-03T19:07:19.869Z
12 - skipped - 2020-06-03T19:07:19.919Z
7 - processed - 2020-06-03T19:07:19.669Z - 2020-06-03T19:07:19.921Z
14 - skipped - 2020-06-03T19:07:20.019Z
15 - skipped - 2020-06-03T19:07:20.070Z
11 - processed - 2020-06-03T19:07:19.869Z - 2020-06-03T19:07:20.122Z
13 - processed - 2020-06-03T19:07:19.968Z - 2020-06-03T19:07:20.219Z
其中第一部分是上游元素的索引,第二部分是应用元素的转换(进入慢速组件时 processed
或 skipped
)和最后一个部分是一个时间戳,以便我们可视化事情发生的时间。
进入阶段的前 2 个元素(0 和 1)由慢组件处理,随后的一堆元素(2、3、4 和 5)跳过慢阶段,直到这 2 个前元素完成并且附加元素可以进入慢速阶段。等等。
吉尔福伊尔