如何使用 Akka 的非阻塞代码持续调用 REST 服务
How to continually call a REST service using non blocking code with Akka
我正在从 REST 端点访问数据:
"https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker"
为了每秒访问一次数据,我使用了一个无限循环 while(true) {
来调用一条每秒发送给参与者的消息,这开始了调用 REST 请求的过程:
访问数据的actor
是:
object ProductTickerRestActor {
case class StringData(data: String)
}
class ProductTickerRestActor extends Actor {
override def receive: PartialFunction[Any, Unit] = {
case ProductTickerRestActor.StringData(data) =>
try {
println("in ProductTickerRestActor")
val rData = scala.io.Source.fromURL("https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker").mkString
println("rData : "+rData)
}
catch {
case e: Exception =>
println("Exception thrown in ProductTickerRestActor: " + e.getMessage)
}
case msg => println(s"I cannot understand ${msg.toString}")
}
}
我使用以下方式启动应用程序:
object ExchangeModelDataApplication {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystemConfig.getActorSystem
val priceDataActor = actorSystem.actorOf(Props[ProductTickerRestActor], "ProductTickerRestActor")
val throttler = Throttlers.getThrottler(priceDataActor)
while(true) {
throttler ! ProductTickerRestActor.StringData("test")
Thread.sleep(1000)
}
}
节流阀:
object Throttlers {
implicit val materializer = ActorMaterializer.create(ActorSystemConfig.getActorSystem)
def getThrottler(priceDataActor: ActorRef) = Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.throttle(1, 1.second)
.to(Sink.actorRef(priceDataActor, NotUsed))
.run()
}
如何 运行 以下代码异步而不是使用无限循环阻塞? :
throttler ! ProductTickerRestActor.StringData("test")
Thread.sleep(1000)
此外,在这种情况下,节流器可能是多余的,因为我无论如何都在循环中节流请求。
我会为此使用 Akka Streams 和 Akka HTTP。使用 Akka 2.6.x,沿着这些思路的东西就足够了 1 request/second
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.scaladsl._
import scala.concurrent.duration._
object HTTPRepeatedly {
implicit val system = ActorSystem()
import system.dispatcher
val sourceFromHttp: Source[String, NotUsed] =
Source.repeated("test") // Not sure what "test" is actually used for here...
.throttle(1, 1.second)
.map { str =>
HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker")
}.mapAsync(1) { req =>
Http().singleRequest(req)
}.mapAsync(1)(_.entity.toStrict(1.minute))
.map(_.data.decodeString(java.nio.charset.StandardCharsets.UTF_8))
}
然后你可以,例如(为简单起见,将其放在 HTTPRepeatedly
内的 main
中,这样隐式就在范围内等)
val done: Future[Done] =
sourceFromHttp
.take(10) // stop after 10 requests
.runWith(Sink.foreach { rData => println(s"rData: $rData") })
scala.concurrent.Await.result(done, 11.minute)
system.terminate()
每秒发送一个请求不是一个好主意。如果由于某种原因请求被延迟,您将收到大量请求。相反,在前一个请求完成后一秒发送下一个请求。
因为此代码使用同步 GET
请求,您可以在 mkString
returns.
后一秒发送下一个请求
但是在 Akka 中使用同步请求并不是消费一个 RESTful API 的好方法。它会阻塞 actor receive
方法,直到请求完成,最终会阻塞整个 ActorSystem
.
改为使用 Akka Http 和 singleRequest
执行异步请求。
Http().singleRequest(HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker"))
这个returns一个Future
。在请求完成一秒后发出新请求(例如,在 Future
上使用 onComplete
)。
这不仅更安全、更异步,而且与 fromUrl
相比,它还提供了对 REST API 调用更多的控制
我正在从 REST 端点访问数据:
"https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker"
为了每秒访问一次数据,我使用了一个无限循环 while(true) {
来调用一条每秒发送给参与者的消息,这开始了调用 REST 请求的过程:
访问数据的actor
是:
object ProductTickerRestActor {
case class StringData(data: String)
}
class ProductTickerRestActor extends Actor {
override def receive: PartialFunction[Any, Unit] = {
case ProductTickerRestActor.StringData(data) =>
try {
println("in ProductTickerRestActor")
val rData = scala.io.Source.fromURL("https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker").mkString
println("rData : "+rData)
}
catch {
case e: Exception =>
println("Exception thrown in ProductTickerRestActor: " + e.getMessage)
}
case msg => println(s"I cannot understand ${msg.toString}")
}
}
我使用以下方式启动应用程序:
object ExchangeModelDataApplication {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystemConfig.getActorSystem
val priceDataActor = actorSystem.actorOf(Props[ProductTickerRestActor], "ProductTickerRestActor")
val throttler = Throttlers.getThrottler(priceDataActor)
while(true) {
throttler ! ProductTickerRestActor.StringData("test")
Thread.sleep(1000)
}
}
节流阀:
object Throttlers {
implicit val materializer = ActorMaterializer.create(ActorSystemConfig.getActorSystem)
def getThrottler(priceDataActor: ActorRef) = Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.throttle(1, 1.second)
.to(Sink.actorRef(priceDataActor, NotUsed))
.run()
}
如何 运行 以下代码异步而不是使用无限循环阻塞? :
throttler ! ProductTickerRestActor.StringData("test")
Thread.sleep(1000)
此外,在这种情况下,节流器可能是多余的,因为我无论如何都在循环中节流请求。
我会为此使用 Akka Streams 和 Akka HTTP。使用 Akka 2.6.x,沿着这些思路的东西就足够了 1 request/second
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.scaladsl._
import scala.concurrent.duration._
object HTTPRepeatedly {
implicit val system = ActorSystem()
import system.dispatcher
val sourceFromHttp: Source[String, NotUsed] =
Source.repeated("test") // Not sure what "test" is actually used for here...
.throttle(1, 1.second)
.map { str =>
HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker")
}.mapAsync(1) { req =>
Http().singleRequest(req)
}.mapAsync(1)(_.entity.toStrict(1.minute))
.map(_.data.decodeString(java.nio.charset.StandardCharsets.UTF_8))
}
然后你可以,例如(为简单起见,将其放在 HTTPRepeatedly
内的 main
中,这样隐式就在范围内等)
val done: Future[Done] =
sourceFromHttp
.take(10) // stop after 10 requests
.runWith(Sink.foreach { rData => println(s"rData: $rData") })
scala.concurrent.Await.result(done, 11.minute)
system.terminate()
每秒发送一个请求不是一个好主意。如果由于某种原因请求被延迟,您将收到大量请求。相反,在前一个请求完成后一秒发送下一个请求。
因为此代码使用同步 GET
请求,您可以在 mkString
returns.
但是在 Akka 中使用同步请求并不是消费一个 RESTful API 的好方法。它会阻塞 actor receive
方法,直到请求完成,最终会阻塞整个 ActorSystem
.
改为使用 Akka Http 和 singleRequest
执行异步请求。
Http().singleRequest(HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker"))
这个returns一个Future
。在请求完成一秒后发出新请求(例如,在 Future
上使用 onComplete
)。
这不仅更安全、更异步,而且与 fromUrl