超时 Akka 流
Timeout Akka Streams Flow
我正在尝试在 akka 流中使用 completionTimeout
。我提供了一个人为的示例,其中流程需要 10 秒,但我添加了一个超时为 1 秒的 completionTimeout
。我希望此流程在 1 秒后超时。但是,在示例中,流程在 10 秒内完成且没有任何错误。
为什么流不超时?有没有更好的方法让流程超时?
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
class Test extends FlatSpec with Matchers {
implicit val system = ActorSystem("test")
"This Test" should "fail but passes and I don't know why" in {
//This takes 10 seconds to complete
val flow: Flow[String, String, NotUsed] = Flow[String]
.map(str => {
println(s"Processing ${str}")
Thread.sleep(10000)
})
.map(_ => {"Done!"})
val future: Future[String] =
Source.single("Input")
.via(flow)
.completionTimeout(1 second) // Set a timeout of 1 second
.runWith(Sink.last)
val result = Await.result(future, 15 seconds)
result should be("Done!")
}
}
在执行给定流时,Akka Stream 利用 operator fusion 通过单个底层参与者融合流运算符以获得最佳性能。为了让你的主线程赶上超时,你可以通过 .async
:
引入异步
val future: Future[String] =
Source.single("Input")
.via(flow)
.async // <--- asynchronous boundary
.completionTimeout(1 second)
.runWith(Sink.last)
future.onComplete(println)
// Processing Input
// Failure(java.util.concurrent.TimeoutException: The stream has not been completed in 1 second.)
引入异步的另一种方法是使用 mapAsync 流阶段:
val flow: Flow[String, String, NotUsed] = Flow[String]
.map(str => {
println(s"Processing ${str}")
Thread.sleep(10000)
})
.mapAsync(1)(_ => Future("Done!")) // <--- asynchronous flow stage
尽管出现相同的超时错误,您可能会注意到使用 mapAsync
需要约 10 秒才能看到结果,而使用 async
只需约 1 秒。这是因为虽然 mapAsync
引入了一个异步流程阶段,但它不是异步边界(就像 async
所做的那样)并且仍然受到运算符融合的影响。
我正在尝试在 akka 流中使用 completionTimeout
。我提供了一个人为的示例,其中流程需要 10 秒,但我添加了一个超时为 1 秒的 completionTimeout
。我希望此流程在 1 秒后超时。但是,在示例中,流程在 10 秒内完成且没有任何错误。
为什么流不超时?有没有更好的方法让流程超时?
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
class Test extends FlatSpec with Matchers {
implicit val system = ActorSystem("test")
"This Test" should "fail but passes and I don't know why" in {
//This takes 10 seconds to complete
val flow: Flow[String, String, NotUsed] = Flow[String]
.map(str => {
println(s"Processing ${str}")
Thread.sleep(10000)
})
.map(_ => {"Done!"})
val future: Future[String] =
Source.single("Input")
.via(flow)
.completionTimeout(1 second) // Set a timeout of 1 second
.runWith(Sink.last)
val result = Await.result(future, 15 seconds)
result should be("Done!")
}
}
在执行给定流时,Akka Stream 利用 operator fusion 通过单个底层参与者融合流运算符以获得最佳性能。为了让你的主线程赶上超时,你可以通过 .async
:
val future: Future[String] =
Source.single("Input")
.via(flow)
.async // <--- asynchronous boundary
.completionTimeout(1 second)
.runWith(Sink.last)
future.onComplete(println)
// Processing Input
// Failure(java.util.concurrent.TimeoutException: The stream has not been completed in 1 second.)
引入异步的另一种方法是使用 mapAsync 流阶段:
val flow: Flow[String, String, NotUsed] = Flow[String]
.map(str => {
println(s"Processing ${str}")
Thread.sleep(10000)
})
.mapAsync(1)(_ => Future("Done!")) // <--- asynchronous flow stage
尽管出现相同的超时错误,您可能会注意到使用 mapAsync
需要约 10 秒才能看到结果,而使用 async
只需约 1 秒。这是因为虽然 mapAsync
引入了一个异步流程阶段,但它不是异步边界(就像 async
所做的那样)并且仍然受到运算符融合的影响。