Akka 流 RestartSink 在故障期间似乎没有重新启动

Akka streams RestartSink doesn't seem to be restarting during failures

我正在使用可重启的源和接收器处理 akka 流中的错误。

object Main extends App {
  implicit val system: ActorSystem = ActorSystem("akka-streams-system")

  val restartSettings =
    RestartSettings(1.seconds, 10.seconds, 0.2d)

  val restartableSource = RestartSource.onFailuresWithBackoff(restartSettings) {() => {
    Source(0 to 10)
      .map(n =>
        if (n < 5) n.toString
        else throw new RuntimeException("Boom!"))
  }}

  val restartableSink: Sink[String, NotUsed] = RestartSink.withBackoff(restartSettings){
    () => Sink.fold("")((_, newVal) => {
      if(newVal == "3") {
        println(newVal + " Exception")
        throw new RuntimeException("Kabooom!!!") // TRIGGERRING A FAILURE expecting the steam to restart just the sink.
      } else {
        println(newVal + " sink")
      }
      newVal
    })
  }
  restartableSource.runWith(restartableSink)
}

我是根据不同的场景分别破解source和sink。我首先打破接收器,希望接收器重新启动并一遍又一遍地重新处理 newVal == 3 消息。 但似乎接收器中的错误被丢弃了,只有源失败被重试,所以源最终被重新启动并重新处理从 0 开始的事件。

我正在模拟一个场景,我想从一个源(比方说从一个文件)读取并有一个 HTTP 接收器独立重试失败的 HTTP 请求而不重新启动整个流的管道。

我使用上述共享代码得到的输出如下。

0 sink
1 sink
2 sink
3 Exception
4 sink
[WARN] [01/10/2022 09:13:14.647] [akka-streams-system-akka.actor.default-dispatcher-6] [RestartWithBackoffSource(akka://akka-streams-system)] Restarting stream due to failure [1]: java.lang.RuntimeException: Boom!
java.lang.RuntimeException: Boom!
    at Main$.$anonfun$restartableSource(Main.scala:18)
    at Main$.$anonfun$restartableSource$adapted(Main.scala:16)
    at akka.stream.impl.fusing.Map$$anon.onPush(Ops.scala:52)
    at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:542)
    at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:496)
    at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:390)
    at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:650)
    at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:521)
    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:787)
    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive.applyOrElse(ActorGraphInterpreter.scala:819)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

如果能帮助我推理为什么会发生这种情况以及如何独立于源重新启动接收器,我将不胜感激。

您的 RestartSink 正在 重新启动(而不是在重新启动其他任何东西的过程中):如果不是,您将永远不会 4 sink作为 3 Exception 之后的输出。出于某种原因,它没有记录日志,但这可能是由于流属性(最近几个月在流重新启动时日志记录也发生了一些行为变化,因此日志记录可能会有所不同,具体取决于您使用的版本 运行)。

来自 RestartSink 的文档:

The restart process is inherently lossy, since there is no coordination between cancelling and the sending of messages. When the wrapped Sink does cancel, this Sink will backpressure, however any elements already sent may have been lost.

这从根本上说是因为在一般情况下流阶段是无记忆的。在您的 Sink.fold 示例中,它将以干净状态重新启动(即 "")。根据我的经验,这确实使 RestartSinkRestartFlow 的用处不如 RestartSource.

对于您描述的用例,我倾向于使用 mapAsync 阶段和 akka.pattern.RetrySupport 通过基于 Future 的 API 发送 HTTP 请求,并且重试失败请求:

val restartingSource: Source[Element, _] = ???

restartingSource.mapAsync(1) { elem =>
  import akka.pattern.RetrySupport._
  // will need an implicit ExecutionContext and an implicit Scheduler (both are probably best obtained from the ActorSystem)

  val sendRequest = () => {
    // Future-based HTTP call
    ???
  }

  retry(
    attempt = sendRequest,
    attempts = Int.MaxValue,
    minBackoff = 1.seconds,
    maxBackoff = 10.seconds,
    randomFactor = 0.2
  )
}.runWith(Sink.ignore)