如何重试失败的 akka-http 请求流解组?
How to retry failed Unmarshalling of a stream of akka-http requests?
我知道在 ActorMaterialzer 上使用监督策略可以在出错时重启 akka-stream
val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)
来源:http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-error.html
我有以下用例。
/***
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http-experimental" % "2.4.2",
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % "2.4.2"
)
*/
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import Uri.Query
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.util.{Success, Failure}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.Future
object SO extends DefaultJsonProtocol {
implicit val system = ActorSystem()
import system.dispatcher
implicit val materializer = ActorMaterializer()
val httpFlow = Http().cachedHostConnectionPoolHttps[HttpRequest]("example.org")
def search(query: Char) = {
val request = HttpRequest(uri = Uri("https://example.org").withQuery(Query("q" -> query.toString)))
(request, request)
}
case class Hello(name: String)
implicit val helloFormat = jsonFormat1(Hello)
val searches =
Source('a' to 'z').map(search).via(httpFlow).mapAsync(1){
case (Success(response), _) => Unmarshal(response).to[Hello]
case (Failure(e), _) => Future.failed(e)
}
def main(): Unit = {
Await.result(searches.runForeach(_ => println), Duration.Inf)
()
}
}
有时查询将无法解组。我想对该单个查询使用重试策略
https://example.org/?q=v
无需重新启动整个字母表。
我认为用 supervsior 策略很难(或不可能)实现它,主要是因为你想重试 "n" 次(根据评论中的讨论),我不认为您可以在使用监督时跟踪元素被尝试的次数。
我认为有两种方法可以解决这个问题。要么将有风险的操作作为单独的流处理,要么创建一个图形来进行错误处理。我会提出两个解决方案。
还要注意 Akka Streams distinguishes between errors and failures,所以如果你不处理你的失败,它们最终会崩溃流(如果没有引入策略),所以在下面的例子中我将它们转换为 Either
,代表成功或错误。
独立流
你可以做的是将每个字母视为一个单独的流,并使用重试策略分别处理每个字母的失败,并有一些延迟。
// this comes after your helloFormat
// note that the method is somehow simpler because it's
// using implicit dispatcher and scheduler from outside scope,
// you may also want to pass it as implicit arguments
def retry[T](f: => Future[T], delay: FiniteDuration, c: Int): Future[T] =
f.recoverWith {
// you may want to only handle certain exceptions here...
case ex: Exception if c > 0 =>
println(s"failed - will retry ${c - 1} more times")
akka.pattern.after(delay, system.scheduler)(retry(f, delay, c - 1))
}
val singleElementFlow = httpFlow.mapAsync[Hello](1) {
case (Success(response), _) =>
val f = Unmarshal(response).to[Hello]
f.recoverWith {
case ex: Exception =>
// see https://github.com/akka/akka/issues/20192
response.entity.dataBytes.runWith(Sink.ignore).flatMap(_ => f)
}
case (Failure(e), _) => Future.failed(e)
}
// so the searches can either go ok or not, for each letter, we will retry up to 3 times
val searches =
Source('a' to 'z').map(search).mapAsync[Either[Throwable, Hello]](1) { elem =>
println(s"trying $elem")
retry(
Source.single(elem).via(singleElementFlow).runWith(Sink.head[Hello]),
1.seconds, 3
).map(ok => Right(ok)).recover { case ex => Left(ex) }
}
// end
图表
此方法会将故障整合到图中,并允许重试。这个例子使所有请求 运行 并行并且更愿意重试那些失败的请求,但是如果你不想要这种行为并且 运行 它们一个接一个,我相信你也可以这样做。
// this comes after your helloFormat
// you may need to have your own class if you
// want to propagate failures for example, but we will use
// right value to keep track of how many times we have
// tried the request
type ParseResult = Either[(HttpRequest, Int), Hello]
def search(query: Char): (HttpRequest, (HttpRequest, Int)) = {
val request = HttpRequest(uri = Uri("https://example.org").withQuery(Query("q" -> query.toString)))
(request, (request, 0)) // let's use this opaque value to count how many times we tried to search
}
val g = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val searches = b.add(Flow[Char])
val tryParse =
Flow[(Try[HttpResponse], (HttpRequest, Int))].mapAsync[ParseResult](1) {
case (Success(response), (req, tries)) =>
println(s"trying parse response to $req for $tries")
Unmarshal(response).to[Hello].
map(h => Right(h)).
recoverWith {
case ex: Exception =>
// see https://github.com/akka/akka/issues/20192
response.entity.dataBytes.runWith(Sink.ignore).map { _ =>
Left((req, tries + 1))
}
}
case (Failure(e), _) => Future.failed(e)
}
val broadcast = b.add(Broadcast[ParseResult](2))
val nonErrors = b.add(Flow[ParseResult].collect {
case Right(x) => x
// you may also handle here Lefts which do exceeded retries count
})
val errors = Flow[ParseResult].collect {
case Left(x) if x._2 < 3 => (x._1, x)
}
val merge = b.add(MergePreferred[(HttpRequest, (HttpRequest, Int))](1, eagerComplete = true))
// @formatter:off
searches.map(search) ~> merge ~> httpFlow ~> tryParse ~> broadcast ~> nonErrors
merge.preferred <~ errors <~ broadcast
// @formatter:on
FlowShape(searches.in, nonErrors.out)
}
def main(args: Array[String]): Unit = {
val source = Source('a' to 'z')
val sink = Sink.seq[Hello]
source.via(g).toMat(sink)(Keep.right).run().onComplete {
case Success(seq) =>
println(seq)
case Failure(ex) =>
println(ex)
}
}
基本上这里发生的是我们 运行 搜索 httpFlow
然后尝试解析响应,我们
然后广播结果并拆分错误和非错误,非错误去sink,错误发送
回到循环。如果重试次数超过计数,我们忽略该元素,但你也可以这样做
还有别的东西。
无论如何,我希望这能给你一些想法。
对于上面的流解决方案,对流中最后一个元素的任何重试都不会执行。这是因为当上游在发送最后一个元素后完成时,合并也将完成。在那之后,唯一的输出来自非重试出口,但由于元素去重试,所以也完成了。
如果您需要所有输入元素来生成输出,您将需要一个额外的机制来阻止上游完成到达流程和重试图。一种可能性是使用 BidiFlow,它监控过程和重试图的输入和输出,以确保在传播 oncomplete 之前已生成所有必需的输出(对于观察到的输入)。在简单的情况下,可能只是计算输入和输出元素。
我知道在 ActorMaterialzer 上使用监督策略可以在出错时重启 akka-stream
val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)
来源:http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-error.html
我有以下用例。
/***
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http-experimental" % "2.4.2",
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % "2.4.2"
)
*/
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import Uri.Query
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.util.{Success, Failure}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.Future
object SO extends DefaultJsonProtocol {
implicit val system = ActorSystem()
import system.dispatcher
implicit val materializer = ActorMaterializer()
val httpFlow = Http().cachedHostConnectionPoolHttps[HttpRequest]("example.org")
def search(query: Char) = {
val request = HttpRequest(uri = Uri("https://example.org").withQuery(Query("q" -> query.toString)))
(request, request)
}
case class Hello(name: String)
implicit val helloFormat = jsonFormat1(Hello)
val searches =
Source('a' to 'z').map(search).via(httpFlow).mapAsync(1){
case (Success(response), _) => Unmarshal(response).to[Hello]
case (Failure(e), _) => Future.failed(e)
}
def main(): Unit = {
Await.result(searches.runForeach(_ => println), Duration.Inf)
()
}
}
有时查询将无法解组。我想对该单个查询使用重试策略
https://example.org/?q=v
无需重新启动整个字母表。
我认为用 supervsior 策略很难(或不可能)实现它,主要是因为你想重试 "n" 次(根据评论中的讨论),我不认为您可以在使用监督时跟踪元素被尝试的次数。
我认为有两种方法可以解决这个问题。要么将有风险的操作作为单独的流处理,要么创建一个图形来进行错误处理。我会提出两个解决方案。
还要注意 Akka Streams distinguishes between errors and failures,所以如果你不处理你的失败,它们最终会崩溃流(如果没有引入策略),所以在下面的例子中我将它们转换为 Either
,代表成功或错误。
独立流
你可以做的是将每个字母视为一个单独的流,并使用重试策略分别处理每个字母的失败,并有一些延迟。
// this comes after your helloFormat
// note that the method is somehow simpler because it's
// using implicit dispatcher and scheduler from outside scope,
// you may also want to pass it as implicit arguments
def retry[T](f: => Future[T], delay: FiniteDuration, c: Int): Future[T] =
f.recoverWith {
// you may want to only handle certain exceptions here...
case ex: Exception if c > 0 =>
println(s"failed - will retry ${c - 1} more times")
akka.pattern.after(delay, system.scheduler)(retry(f, delay, c - 1))
}
val singleElementFlow = httpFlow.mapAsync[Hello](1) {
case (Success(response), _) =>
val f = Unmarshal(response).to[Hello]
f.recoverWith {
case ex: Exception =>
// see https://github.com/akka/akka/issues/20192
response.entity.dataBytes.runWith(Sink.ignore).flatMap(_ => f)
}
case (Failure(e), _) => Future.failed(e)
}
// so the searches can either go ok or not, for each letter, we will retry up to 3 times
val searches =
Source('a' to 'z').map(search).mapAsync[Either[Throwable, Hello]](1) { elem =>
println(s"trying $elem")
retry(
Source.single(elem).via(singleElementFlow).runWith(Sink.head[Hello]),
1.seconds, 3
).map(ok => Right(ok)).recover { case ex => Left(ex) }
}
// end
图表
此方法会将故障整合到图中,并允许重试。这个例子使所有请求 运行 并行并且更愿意重试那些失败的请求,但是如果你不想要这种行为并且 运行 它们一个接一个,我相信你也可以这样做。
// this comes after your helloFormat
// you may need to have your own class if you
// want to propagate failures for example, but we will use
// right value to keep track of how many times we have
// tried the request
type ParseResult = Either[(HttpRequest, Int), Hello]
def search(query: Char): (HttpRequest, (HttpRequest, Int)) = {
val request = HttpRequest(uri = Uri("https://example.org").withQuery(Query("q" -> query.toString)))
(request, (request, 0)) // let's use this opaque value to count how many times we tried to search
}
val g = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val searches = b.add(Flow[Char])
val tryParse =
Flow[(Try[HttpResponse], (HttpRequest, Int))].mapAsync[ParseResult](1) {
case (Success(response), (req, tries)) =>
println(s"trying parse response to $req for $tries")
Unmarshal(response).to[Hello].
map(h => Right(h)).
recoverWith {
case ex: Exception =>
// see https://github.com/akka/akka/issues/20192
response.entity.dataBytes.runWith(Sink.ignore).map { _ =>
Left((req, tries + 1))
}
}
case (Failure(e), _) => Future.failed(e)
}
val broadcast = b.add(Broadcast[ParseResult](2))
val nonErrors = b.add(Flow[ParseResult].collect {
case Right(x) => x
// you may also handle here Lefts which do exceeded retries count
})
val errors = Flow[ParseResult].collect {
case Left(x) if x._2 < 3 => (x._1, x)
}
val merge = b.add(MergePreferred[(HttpRequest, (HttpRequest, Int))](1, eagerComplete = true))
// @formatter:off
searches.map(search) ~> merge ~> httpFlow ~> tryParse ~> broadcast ~> nonErrors
merge.preferred <~ errors <~ broadcast
// @formatter:on
FlowShape(searches.in, nonErrors.out)
}
def main(args: Array[String]): Unit = {
val source = Source('a' to 'z')
val sink = Sink.seq[Hello]
source.via(g).toMat(sink)(Keep.right).run().onComplete {
case Success(seq) =>
println(seq)
case Failure(ex) =>
println(ex)
}
}
基本上这里发生的是我们 运行 搜索 httpFlow
然后尝试解析响应,我们
然后广播结果并拆分错误和非错误,非错误去sink,错误发送
回到循环。如果重试次数超过计数,我们忽略该元素,但你也可以这样做
还有别的东西。
无论如何,我希望这能给你一些想法。
对于上面的流解决方案,对流中最后一个元素的任何重试都不会执行。这是因为当上游在发送最后一个元素后完成时,合并也将完成。在那之后,唯一的输出来自非重试出口,但由于元素去重试,所以也完成了。
如果您需要所有输入元素来生成输出,您将需要一个额外的机制来阻止上游完成到达流程和重试图。一种可能性是使用 BidiFlow,它监控过程和重试图的输入和输出,以确保在传播 oncomplete 之前已生成所有必需的输出(对于观察到的输入)。在简单的情况下,可能只是计算输入和输出元素。