使用 Finagle 重试失败

Failure retries with Finagle

我正在为 Finagle 客户端重试而苦苦挣扎。 出于某种原因,客户端不会在测试中重试失败的请求,即使我使用自定义分类器,它应该将 200 以外的任何响应代码标记为 RetryableFailure。我已经尝试使用 ClientBuilder 和 Http.client.

构建客户端

我缩小了我的代码范围并只测试了必要的部分。

请看:

MyApi.scala:

import io.circe._, io.circe.generic.auto._, io.circe.jawn._, io.circe.syntax._
import com.twitter.finagle.http._
import com.twitter.finagle.Service
import com.twitter.finagle.Codec
import com.twitter.finagle.builder._
import scala.concurrent.ExecutionContext.Implicits.global
import com.twitter.conversions.time._
import com.twitter.finagle.http.service.HttpResponseClassifier
import com.twitter.finagle.service._
import com.twitter.util.Return

class MyApi(val host: String, val protocol: String = "https") {

  // def request: Service[Request, Response] = {
  //   val clientBuilder = ClientBuilder()
  //     .codec(Http())
  //     .responseClassifier(classifier)
  //     .noFailureAccrual
  //     .requestTimeout(30.seconds)
  //     .hostConnectionLimit(5)
  //     .tcpConnectTimeout(5.seconds)
  //     .retryBudget(budget)
  //     .retries(5)

  //   protocol match {
  //     case "https" => clientBuilder.hosts(s"$host:443").tls(host).build()
  //     case _ => clientBuilder.hosts(host).build()
  //   }
  // }

  val budget: RetryBudget = RetryBudget(
    ttl = 10.seconds,
    minRetriesPerSec = 5,
    percentCanRetry = 0.1
  )

  val classifier: ResponseClassifier = {
    case ReqRep(_, Return(r: Response)) if r.statusCode == 200 =>
      ResponseClass.Success
    case _ => ResponseClass.RetryableFailure
  }

  def request: Service[Request, Response] = com.twitter.finagle.Http.client
    .withResponseClassifier(classifier)
    .withRetryBudget(budget)
    .withRetryBackoff(Backoff.exponentialJittered(2.seconds, 32.seconds))
    .newService(host)

  def requestUsers = request(users(1))

  val users: (Int => Request) = (n) => RequestBuilder()
    .url(s"$protocol://$host/api?results=$n")
    .buildGet()
}

object MyApi {
  def apply(host: String, protocol: String) = new MyApi(host, protocol)
}

MyApiSpec.scala:

import com.twitter.util.Return
import com.twitter.finagle.http.service.HttpResponseClassifier
import org.scalatest._
import org.scalatest.concurrent._
import org.scalamock.proxy.ProxyMockFactory
import scala.language.postfixOps
import com.twitter.util.{Await}
import org.scalamock.scalatest.MockFactory
import com.twitter.finagle.http._
import org.scalamock.proxy.ProxyMockFactory
import io.finch._
import com.twitter.finagle.Http
import scala.io.Source
import com.twitter.io.{Reader, Buf}
import com.twitter.finagle.ListeningServer

class MyApiSpec extends FlatSpec with Matchers with MockFactory with ScalaFutures with ProxyMockFactory with BeforeAndAfter with BeforeAndAfterAll {

  var server: ListeningServer = _
  val ru = MyApi("localhost:1490", "http")

  after {
    server.close()
  }

  var attempts = 0
  val failure: Endpoint[String] = get("api") {
    println(attempts)
    if (attempts > 1) {
      Ok("test message")
    }
    else {
      attempts += 1
      BadGateway(new Exception("try again"))
    }
  }

  it should "avoid network issues by retrying" in {
    server = Http.server
      .serve(":1490", failure.toServiceAs[Text.Plain])

    val users = Await.result(ru.requestUsers).contentString
    assert(users == "test message")
  }
}

build.sbt:

name := "myapi"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" % "test"
libraryDependencies += "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.2"

val circeVersion = "0.4.1"

libraryDependencies ++= Seq(
  "io.circe" %% "circe-core",
  "io.circe" %% "circe-generic",
  "io.circe" %% "circe-parser"
).map(_ % circeVersion)

libraryDependencies ++= Seq(
  "com.github.finagle" %% "finch-core" % "0.11.0-M2",
  "com.github.finagle" %% "finch-circe" % "0.11.0-M2"
)

libraryDependencies += "com.typesafe" % "config" % "1.3.0"
libraryDependencies += "com.typesafe.slick" %% "slick" % "3.1.1"
libraryDependencies += "com.typesafe.slick" %% "slick-hikaricp" % "3.1.0"
libraryDependencies += "com.typesafe.slick" %% "slick-testkit" % "3.1.1" % "test"


libraryDependencies += "org.postgresql" % "postgresql" % "9.4-1206-jdbc4"
libraryDependencies += "junit" % "junit" % "4.8.1" % "test"

libraryDependencies += "org.scalamock" %% "scalamock-scalatest-support" % "3.2.2" % "test"

val scalazVersion = "7.2.4"

libraryDependencies ++= Seq(
  "org.scalaz" %% "scalaz-core" % scalazVersion,
  "org.scalaz" %% "scalaz-effect" % scalazVersion,
  "org.scalaz" %% "scalaz-scalacheck-binding" % scalazVersion % "test"
)

scalacOptions += "-feature"

initialCommands in console := "import scalaz._, Scalaz._"

抱歉,太混乱了。默认情况下,Finagle 不会重试应用程序级故障(通过 ResponseClassifiers 设置),因此您需要明确启用重试(请参阅用户指南中的 Retries 部分)。

尝试在字里行间构建一些东西:

import com.twitter.conversions.time._
import com.twitter.finagle.Http
import com.twitter.finagle.http.{Request, Response}
import com.twitter.finagle.util.DefaultTimer
import com.twitter.finagle.service.{Backoff, RetryFilter}
import com.twitter.finagle.stats.NullStatsReceiver
import com.twitter.util._
import com.twitter.finagle.param.HighResTimer

val twitter = Http.client.newService("twitter.com")

val shouldRetry: PartialFunction [(Request, Try[Response]), Boolean] = {
  case (_, Return(rep)) => rep.status != 200
}

implicit val t = HighResTimer.Default
// 3 retries, backoff 1 second
val retry = RetryFilter(Backoff.const(1.second).take(3))(shouldRetry)

val retryTwitter = retry.andThen(twitter)