bublik42 bublik42 - 3 months ago 13
Scala Question

Failure retries with Finagle

I'm struggling with Finagle client retries.
For some reason, client doesn't retry failed requests in test, even though I use a custom classifier, which should mark any response code other than 200 as RetryableFailure. I have tried bulding client both with ClientBuilder and with Http.client.

I narrowed down my code and test to only the necessary bit.

Please take a look:

MyApi.scala:

import io.circe._, io.circe.generic.auto._, io.circe.jawn._, io.circe.syntax._
import com.twitter.finagle.http._
import com.twitter.finagle.Service
import com.twitter.finagle.Codec
import com.twitter.finagle.builder._
import scala.concurrent.ExecutionContext.Implicits.global
import com.twitter.conversions.time._
import com.twitter.finagle.http.service.HttpResponseClassifier
import com.twitter.finagle.service._
import com.twitter.util.Return

class MyApi(val host: String, val protocol: String = "https") {

// def request: Service[Request, Response] = {
// val clientBuilder = ClientBuilder()
// .codec(Http())
// .responseClassifier(classifier)
// .noFailureAccrual
// .requestTimeout(30.seconds)
// .hostConnectionLimit(5)
// .tcpConnectTimeout(5.seconds)
// .retryBudget(budget)
// .retries(5)

// protocol match {
// case "https" => clientBuilder.hosts(s"$host:443").tls(host).build()
// case _ => clientBuilder.hosts(host).build()
// }
// }

val budget: RetryBudget = RetryBudget(
ttl = 10.seconds,
minRetriesPerSec = 5,
percentCanRetry = 0.1
)

val classifier: ResponseClassifier = {
case ReqRep(_, Return(r: Response)) if r.statusCode == 200 =>
ResponseClass.Success
case _ => ResponseClass.RetryableFailure
}

def request: Service[Request, Response] = com.twitter.finagle.Http.client
.withResponseClassifier(classifier)
.withRetryBudget(budget)
.withRetryBackoff(Backoff.exponentialJittered(2.seconds, 32.seconds))
.newService(host)

def requestUsers = request(users(1))

val users: (Int => Request) = (n) => RequestBuilder()
.url(s"$protocol://$host/api?results=$n")
.buildGet()
}

object MyApi {
def apply(host: String, protocol: String) = new MyApi(host, protocol)
}


MyApiSpec.scala:

import com.twitter.util.Return
import com.twitter.finagle.http.service.HttpResponseClassifier
import org.scalatest._
import org.scalatest.concurrent._
import org.scalamock.proxy.ProxyMockFactory
import scala.language.postfixOps
import com.twitter.util.{Await}
import org.scalamock.scalatest.MockFactory
import com.twitter.finagle.http._
import org.scalamock.proxy.ProxyMockFactory
import io.finch._
import com.twitter.finagle.Http
import scala.io.Source
import com.twitter.io.{Reader, Buf}
import com.twitter.finagle.ListeningServer

class MyApiSpec extends FlatSpec with Matchers with MockFactory with ScalaFutures with ProxyMockFactory with BeforeAndAfter with BeforeAndAfterAll {

var server: ListeningServer = _
val ru = MyApi("localhost:1490", "http")

after {
server.close()
}

var attempts = 0
val failure: Endpoint[String] = get("api") {
println(attempts)
if (attempts > 1) {
Ok("test message")
}
else {
attempts += 1
BadGateway(new Exception("try again"))
}
}

it should "avoid network issues by retrying" in {
server = Http.server
.serve(":1490", failure.toServiceAs[Text.Plain])

val users = Await.result(ru.requestUsers).contentString
assert(users == "test message")
}
}


build.sbt:

name := "myapi"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" % "test"
libraryDependencies += "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.2"

val circeVersion = "0.4.1"

libraryDependencies ++= Seq(
"io.circe" %% "circe-core",
"io.circe" %% "circe-generic",
"io.circe" %% "circe-parser"
).map(_ % circeVersion)

libraryDependencies ++= Seq(
"com.github.finagle" %% "finch-core" % "0.11.0-M2",
"com.github.finagle" %% "finch-circe" % "0.11.0-M2"
)

libraryDependencies += "com.typesafe" % "config" % "1.3.0"
libraryDependencies += "com.typesafe.slick" %% "slick" % "3.1.1"
libraryDependencies += "com.typesafe.slick" %% "slick-hikaricp" % "3.1.0"
libraryDependencies += "com.typesafe.slick" %% "slick-testkit" % "3.1.1" % "test"


libraryDependencies += "org.postgresql" % "postgresql" % "9.4-1206-jdbc4"
libraryDependencies += "junit" % "junit" % "4.8.1" % "test"

libraryDependencies += "org.scalamock" %% "scalamock-scalatest-support" % "3.2.2" % "test"

val scalazVersion = "7.2.4"

libraryDependencies ++= Seq(
"org.scalaz" %% "scalaz-core" % scalazVersion,
"org.scalaz" %% "scalaz-effect" % scalazVersion,
"org.scalaz" %% "scalaz-scalacheck-binding" % scalazVersion % "test"
)

scalacOptions += "-feature"

initialCommands in console := "import scalaz._, Scalaz._"

Answer

Sorry, it's so confusing. Finagle doesn't retry application-level failures (set via ResponseClassifiers) by default so you'd need to enable retries explicitly (see Retries section in the user guide).

Try building something between the lines:

import com.twitter.conversions.time._
import com.twitter.finagle.Http
import com.twitter.finagle.http.{Request, Response}
import com.twitter.finagle.util.DefaultTimer
import com.twitter.finagle.service.{Backoff, RetryFilter}
import com.twitter.finagle.stats.NullStatsReceiver
import com.twitter.util._

implicit val t = DefaultTimer
val twitter = Http.client.newService("twitter.com")
// 3 retries, backoff 1 second
val retry = RetryFilter(Backoff.const(1.second).take(3)) {
  case (_, Return(rep)) => rep.status != 200
}

val retryTwitter = retry.andThen(twitter)
Comments