Akka HTTP REST API 用于生产 Kafka 性能

Akka HTTP REST API for producing to Kafka Performance

我正在构建一个 API 和 Akka 应该生产到 Kafka 总线。我一直在使用 Gatling 对应用程序进行负载测试。注意到当在 Gatling 中创建超过 1000 个用户时,API 开始挣扎。平均而言,每秒处理大约 170 个请求,这对我来说似乎很少。

API 的主要入口点是:

import akka.actor.{Props, ActorSystem}

import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.pattern.ask
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.unmarshalling.Unmarshaller
import akka.stream.ActorMaterializer
import com.typesafe.config.{Config, ConfigFactory}
import play.api.libs.json.{JsObject, Json}

import scala.concurrent.{Future, ExecutionContext}
import akka.http.scaladsl.server.Directives._
import akka.util.Timeout
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

case class PostMsg(msg:JsObject)
case object PostSuccess
case class PostFailure(msg:String)

class Msgapi(conf:Config) {
  implicit val um:Unmarshaller[HttpEntity, JsObject] = {
    Unmarshaller.byteStringUnmarshaller.mapWithCharset { (data, charset) =>
      Json.parse(data.toArray).asInstanceOf[JsObject]
    }
  }
  implicit val system = ActorSystem("MsgApi")
  implicit val timeout = Timeout(5 seconds)
  implicit val materializer = ActorMaterializer()

  val router = system.actorOf(Props(new RouterActor(conf)))

  val route = {
    path("msg") {
      post {
        entity(as[JsObject]) {obj =>
          if(!obj.keys.contains("key1") || !obj.keys.contains("key2") || !obj.keys.contains("key3")){
            complete{
              HttpResponse(status=StatusCodes.BadRequest, entity="Invalid json provided. Required fields: key1, key2, key3 \n")
            }
          } else {
            onSuccess(router ? PostMsg(obj)){
              case PostSuccess => {
                complete{
                  Future{
                    HttpResponse(status = StatusCodes.OK, entity = "Post success")
                  }
                }
              }
              case PostFailure(msg) =>{
                complete{
                  Future{
                    HttpResponse(status = StatusCodes.InternalServerError, entity=msg)
                  }
                }
              }
              case _ => {
                complete{
                  Future{
                    HttpResponse(status = StatusCodes.InternalServerError, entity = "Unknown Server error occurred.")
                  }
                }
              }
            }
          }
        }
      }
    }
  }

  def run():Unit = {
    Http().bindAndHandle(route, interface = conf.getString("http.host"), port = conf.getInt("http.port"))
  }
}

object RunMsgapi {
  def main(Args: Array[String]):Unit = {
    val conf = ConfigFactory.load()
    val api = new Msgapi(conf)
    api.run()
  }
}

路由actor如下:

import akka.actor.{ActorSystem, Props, Actor}
import akka.http.scaladsl.server.RequestContext
import akka.routing.{Router, SmallestMailboxRoutingLogic, ActorRefRoutee}
import com.typesafe.config.Config
import play.api.libs.json.JsObject

class RouterActor(conf:Config) extends Actor{

  val router = {
    val routees = Vector.tabulate(conf.getInt("kafka.producer-number"))(n => {
      val r = context.system.actorOf(Props(new KafkaProducerActor(conf, n )))
      ActorRefRoutee(r)
    })
    Router(SmallestMailboxRoutingLogic(), routees)
  }

  def receive = {
    case PostMsg(msg) => {
      router.route(PostMsg(msg), sender())
    }
  }
}

最后,卡夫卡生产者演员:

import akka.actor.Actor
import java.util.Properties
import com.typesafe.config.Config
import kafka.message.NoCompressionCodec
import kafka.utils.Logging
import org.apache.kafka.clients.producer._
import play.api.libs.json.JsObject
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Await}
import ExecutionContext.Implicits.global

import scala.concurrent.{Future, Await}
import scala.util.{Failure, Success}

class KafkaProducerActor(conf:Config, id:Int) extends Actor with Logging {
  var topic: String = conf.getString("kafka.topic")
  val codec = NoCompressionCodec.codec

  val props = new Properties()
  props.put("bootstrap.servers", conf.getString("kafka.bootstrap-servers"))
  props.put("acks", conf.getString("kafka.acks"))
  props.put("retries", conf.getString("kafka.retries"))
  props.put("batch.size", conf.getString("kafka.batch-size"))
  props.put("linger.ms", conf.getString("kafka.linger-ms"))
  props.put("buffer.memory", conf.getString("kafka.buffer-memory"))
  props.put("key.serializer", conf.getString("kafka.key-serializer"))
  props.put("value.serializer", conf.getString("kafka.value-serializer"))

  val producer = new KafkaProducer[String, String](props)

  def receive = {
    case PostMsg(msg) => {
      // push the msg to Kafka
      try{
        val res = Future{
          producer.send(new ProducerRecord[String, String](topic, msg.toString()))
        }
        val result = Await.result(res, 1 second).get()
        sender ! PostSuccess
      } catch{
        case e: Exception => {
          println(e.printStackTrace())
          sender ! PostFailure("Kafka push error")
        }
      }
    }
  }
}

我的想法是,在 application.conf 中,我可以轻松指定应该有多少生产者,从而实现更好的横向扩展。

然而,现在看来 api 或路由器实际上是瓶颈。作为测试,我禁用了 Kafka 生成代码,并将其替换为一个简单的:sender ! PostSuccess。在 Gatling 中有 3000 个用户,我仍然有 6% 的请求由于超时而失败,这对我来说似乎是一个很长的时间。

我正在执行的 Gatling 测试如下:

import io.gatling.core.Predef._ // 2
import io.gatling.http.Predef._
import scala.concurrent.duration._

class BasicSimulation extends Simulation { // 3
val httpConf = http // 4
    .baseURL("http://localhost:8080") // 5
    .acceptHeader("text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8") // 6
    .doNotTrackHeader("1")
    .acceptLanguageHeader("en-US,en;q=0.5")
    .acceptEncodingHeader("gzip, deflate")
    .userAgentHeader("Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20100101 Firefox/31.0")
    .header("Content-Type", "application/json")

  val scn = scenario("MsgLoadTest")
    .repeat(100)(
      pace(2 seconds)
      .exec(http("request_1")
      .post("/msg").body(StringBody("""{ "key1":"something", "key2": "somethingElse", "key3":2222}""")).asJSON)
    )

  setUp( // 11
    scn.inject(rampUsers(3000) over (5 seconds)) // 12
  ).protocols(httpConf) // 13
}

更新

根据 cmbaxter 的一些指示,我尝试了一些方法(请参阅评论中的讨论),并在 gatling 负载测试期间使用 visualvm 对应用程序进行了分析。我不太清楚如何解释这些结果。似乎很多时间都花在了ThreadPoolExecutor上,但这可能没问题吧? 下面是分析中的两个屏幕截图:

我会完全消除 KafkaProducerActor 和路由器并直接调用 producer.send 的 Scala 封装版本。如果没有必要,为什么要制造一个可能的瓶颈?我可以很好地想象全局执行上下文或参与者系统成为您当前设置中的瓶颈。

像这样应该可以解决问题:

class KafkaScalaProducer(val producer : KafkaProducer[String, String](props)) {
    def send(topic: String, msg : String) : Future[RecordMetadata] = {
        val promise = Promise[RecordMetadata]()
        try {
            producer.send(new ProducerRecord[String, String](topic, msg), new Callback {
                override def onCompletion(md : RecordMetadata, e : java.lang.Exception) {
                    if (md == null) promise.success(md)
                    else promise.failure(e)
                }
            })
        } catch {
            case e : BufferExhaustedException => promise.failure(e)
            case e : KafkaException => promise.failure(e)
        }
        promise.future
    }

    def close = producer.close
}

(注:这段代码我没有实际试过,应该理解为伪代码)

然后我会简单地transform将未来的结果变成HttpResponse

之后就是调整配置的问题了。您现在的瓶颈是 Kafka Producer 或 Akka Http。

为了排除 Kafka 生产者,我从 Actor 中删除了逻辑。我仍然遇到性能问题。因此,作为最终测试,我修改了 API 以在出现 POST 时简单地给出直接答案:

val route = {
    path("msg") {
      post {
        entity(as[String]) { obj =>
          complete(
            HttpResponse(status = StatusCodes.OK, entity = "OK")
          )
        }
      }
    }
  }

我在 Spray 中实现了相同的路由,以比较性能。结果很清楚。 Akka HTTP(至少在目前的测试设置中)并没有接近 Spray 的性能。也许可以对 Akka HTTP 进行一些调整?我附上了两张 Gatling 中 3000 个并发用户的响应时间图的屏幕截图,发出 post 请求。

Akka HTTP