停止时跳过一部分流

Skip a part of the stream when it's stalled

我有这样一种情况,消息通过一个可能延迟它们的组件传递。

在压力下我想跳过这个组件,这样最多可以同时延迟 X 条消息。溢出的消息将跳过此阶段并进入下一阶段。

消息在此阶段停止,直到它们的未来完成,或最多一分钟,以先到者为准。

我可能可以类似地实现自定义 GraphStage to this buffer example,或者使用 divertTo 和一些计数器来使消息跳过停滞的组件, 但感觉在 akka 流中可能有更简单的方法

我一直在研究你的用例,并提出了一个基于 Akka Actor 代表计数器和异步映射阶段的解决方案:

想法是在任何给定时间最多可以处理 3 个元素,并且基于最大容量为 2 的计数器,我们最多只允许 2 这些元素同时由慢速组件处理。

这样一来,总有一个处理线程保留给上游元素,这些元素将从慢组件中分支出来并直接到达下游。


我们先定义一个基本的Counter,最大容量为Akka Actor:

class Counter(max: Int) extends Actor {
  private var count: Int = 0

  override def receive: Receive = {
    case TryAndLock if count < max =>
      count += 1
      sender ! true
    case TryAndLock =>
      sender ! false
    case Release =>
      count -= 1
  }
}

sealed trait CounterAction
case object TryAndLock extends CounterAction
case object Release extends CounterAction

val counter = system.actorOf(Props(new Counter(max = 2)))

它包含一个可变的 count 变量,可以通过 TryAndLock 请求递增,但前提是计数尚未达到最大容量,并且可以通过 [=] 递减22=]请求。

我们正在使用 Actor,以便在没有竞争条件的情况下正确处理来自以下 mapAsync 阶段的并发锁定和释放操作。


那么,只需要使用 mapAsyncUnordered 阶段,其并行度仅比计数器的最大容量高出 1 个单位。

任何通过异步阶段的元素都会查询 Counter 以尝试锁定资源。如果资源已被锁定,则该元素将被扔到慢速组件中。如果没有,它将跳过它。元素被传递到慢速​​组件,直到我们达到计数器的最大容量,此时任何新元素都将被跳过,直到元素退出慢速组件并从计数器释放资源。

我们不能简单地使用 mapAsync 因为元素在存在阶段时会保持其上游的顺序,使得跳过的元素等待慢速组件处理的元素在下游生产之前完成。因此有必要使用 mapAsyncUnordered 代替。

我们定义一个例子,慢速组件最多同时处理2个元素和并行度为3的异步映射:

Source(0 to 15)
  .throttle(1, 50.milliseconds)
  .mapAsyncUnordered(parallelism = 3) { i =>
    (counter ? TryAndLock).map {
      case locked: Boolean if locked =>
        val result = slowTask(i)
        counter ! Release
        result
      case _ =>
        skipTask(i)
    }
  }
  .runForeach(println)

例如这两个函数将模拟慢速组件 (slowTask) 以及跳过慢速组件时要做什么 (skipTask):

def slowTask(value: Int): String = {
  val start = Instant.now()
  Thread.sleep(250)
  s"$value - processed - $start - ${Instant.now()}"
}
def skipTask(value: Int): String =
  s"$value - skipped - ${Instant.now()}"

结果类似于:

2 - skipped - 2020-06-03T19:07:19.410Z
3 - skipped - 2020-06-03T19:07:19.468Z
4 - skipped - 2020-06-03T19:07:19.518Z
5 - skipped - 2020-06-03T19:07:19.569Z
1 - processed - 2020-06-03T19:07:19.356Z - 2020-06-03T19:07:19.611Z
0 - processed - 2020-06-03T19:07:19.356Z - 2020-06-03T19:07:19.611Z
8 - skipped - 2020-06-03T19:07:19.719Z
9 - skipped - 2020-06-03T19:07:19.769Z
10 - skipped - 2020-06-03T19:07:19.819Z
6 - processed - 2020-06-03T19:07:19.618Z - 2020-06-03T19:07:19.869Z
12 - skipped - 2020-06-03T19:07:19.919Z
7 - processed - 2020-06-03T19:07:19.669Z - 2020-06-03T19:07:19.921Z
14 - skipped - 2020-06-03T19:07:20.019Z
15 - skipped - 2020-06-03T19:07:20.070Z
11 - processed - 2020-06-03T19:07:19.869Z - 2020-06-03T19:07:20.122Z
13 - processed - 2020-06-03T19:07:19.968Z - 2020-06-03T19:07:20.219Z

其中第一部分是上游元素的索引,第二部分是应用元素的转换(进入慢速组件时 processedskipped)和最后一个部分是一个时间戳,以便我们可视化事情发生的时间。

进入阶段的前 2 个元素(0 和 1)由慢组件处理,随后的一堆元素(2、3、4 和 5)跳过慢阶段,直到这 2 个前元素完成并且附加元素可以进入慢速阶段。等等。

吉尔福伊尔