如何使用 Akka 的非阻塞代码持续调用 REST 服务

How to continually call a REST service using non blocking code with Akka

我正在从 REST 端点访问数据:

"https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker"

为了每秒访问一次数据,我使用了一个无限循环 while(true) { 来调用一条每秒发送给参与者的消息,这开始了调用 REST 请求的过程:

访问数据的actor是:

object ProductTickerRestActor {

  case class StringData(data: String)

}

class ProductTickerRestActor extends Actor {
  
  override def receive: PartialFunction[Any, Unit] = {

    case ProductTickerRestActor.StringData(data) =>
      try {
        println("in ProductTickerRestActor")
        val rData = scala.io.Source.fromURL("https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker").mkString
        println("rData : "+rData)

      }
      catch {
        case e: Exception =>
          println("Exception thrown in ProductTickerRestActor: " + e.getMessage)
      }

    case msg => println(s"I cannot understand ${msg.toString}")
  }
}

我使用以下方式启动应用程序:

object ExchangeModelDataApplication {

  def main(args: Array[String]): Unit = {

    val actorSystem = ActorSystemConfig.getActorSystem

    val priceDataActor = actorSystem.actorOf(Props[ProductTickerRestActor], "ProductTickerRestActor")
    val throttler = Throttlers.getThrottler(priceDataActor)
    while(true) {
      throttler ! ProductTickerRestActor.StringData("test")
      Thread.sleep(1000)
    }

}

节流阀:

object Throttlers {


  implicit val materializer = ActorMaterializer.create(ActorSystemConfig.getActorSystem)

  def getThrottler(priceDataActor: ActorRef) = Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
    .throttle(1, 1.second)
    .to(Sink.actorRef(priceDataActor, NotUsed))
    .run()
}

如何 运行 以下代码异步而不是使用无限循环阻塞? :

throttler ! ProductTickerRestActor.StringData("test")
Thread.sleep(1000) 

此外,在这种情况下,节流器可能是多余的,因为我无论如何都在循环中节流请求。

我会为此使用 Akka Streams 和 Akka HTTP。使用 Akka 2.6.x,沿着这些思路的东西就足够了 1 request/second

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.scaladsl._

import scala.concurrent.duration._

object HTTPRepeatedly {
  implicit val system = ActorSystem()
  import system.dispatcher

  val sourceFromHttp: Source[String, NotUsed] =
    Source.repeated("test") // Not sure what "test" is actually used for here...
      .throttle(1, 1.second)
      .map { str =>
        HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker")
      }.mapAsync(1) { req =>
        Http().singleRequest(req)
      }.mapAsync(1)(_.entity.toStrict(1.minute))
      .map(_.data.decodeString(java.nio.charset.StandardCharsets.UTF_8))
}

然后你可以,例如(为简单起见,将其放在 HTTPRepeatedly 内的 main 中,这样隐式就在范围内等)

val done: Future[Done] =
  sourceFromHttp
    .take(10) // stop after 10 requests
    .runWith(Sink.foreach { rData => println(s"rData: $rData") })

scala.concurrent.Await.result(done, 11.minute)

system.terminate()

每秒发送一个请求不是一个好主意。如果由于某种原因请求被延迟,您将收到大量请求。相反,在前一个请求完成后一秒发送下一个请求。

因为此代码使用同步 GET 请求,您可以在 mkString returns.

后一秒发送下一个请求

但是在 Akka 中使用同步请求并不是消费一个 RESTful API 的好方法。它会阻塞 actor receive 方法,直到请求完成,最终会阻塞整个 ActorSystem.

改为使用 Akka Http 和 singleRequest 执行异步请求。

Http().singleRequest(HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker"))

这个returns一个Future。在请求完成一秒后发出新请求(例如,在 Future 上使用 onComplete)。

这不仅更安全、更异步,而且与 fromUrl

相比,它还提供了对 REST API 调用更多的控制