超时 Akka 流

Timeout Akka Streams Flow

我正在尝试在 akka 流中使用 completionTimeout。我提供了一个人为的示例,其中流程需要 10 秒,但我添加了一个超时为 1 秒的 completionTimeout。我希望此流程在 1 秒后超时。但是,在示例中,流程在 10 秒内完成且没有任何错误。

为什么流不超时?有没有更好的方法让流程超时?

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.scalatest.{FlatSpec, Matchers}

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

class Test extends FlatSpec with Matchers {

  implicit val system = ActorSystem("test")

  "This Test" should "fail but passes and I don't know why" in {

    //This takes 10 seconds to complete
    val flow: Flow[String, String, NotUsed] = Flow[String]
        .map(str => {
          println(s"Processing ${str}")
          Thread.sleep(10000)
        })
        .map(_ => {"Done!"})

    val future: Future[String] =
      Source.single("Input")
        .via(flow)
        .completionTimeout(1 second) // Set a timeout of 1 second
        .runWith(Sink.last)

    val result = Await.result(future, 15 seconds)

    result should be("Done!")
  }
}

在执行给定流时,Akka Stream 利用 operator fusion 通过单个底层参与者融合流运算符以获得最佳性能。为了让你的主线程赶上超时,你可以通过 .async:

引入异步
val future: Future[String] =
  Source.single("Input")
    .via(flow)
    .async  // <--- asynchronous boundary
    .completionTimeout(1 second)
    .runWith(Sink.last)

future.onComplete(println)
// Processing Input
// Failure(java.util.concurrent.TimeoutException: The stream has not been completed in 1 second.)

引入异步的另一种方法是使用 mapAsync 流阶段:

val flow: Flow[String, String, NotUsed] = Flow[String]
  .map(str => {
    println(s"Processing ${str}")
    Thread.sleep(10000)
  })
  .mapAsync(1)(_ => Future("Done!"))  // <--- asynchronous flow stage

尽管出现相同的超时错误,您可能会注意到使用 mapAsync 需要约 10 秒才能看到结果,而使用 async 只需约 1 秒。这是因为虽然 mapAsync 引入了一个异步流程阶段,但它不是异步边界(就像 async 所做的那样)并且仍然受到运算符融合的影响。