使用 Scala 调节器改变每条消息之间的时间
Varying the time between each message using Scala throttler
我正在学习 Akka 并编写了以下代码来限制发送给 TradeAction
演员的消息数量。每秒最多发送 3 条消息。是否可以修改节流器以便设置每条消息之间的时间?例如message1和message2之间延迟2秒,message2和message3之间延迟1秒。
import akka.NotUsed
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Sink, Source}
import systemconfig.ActorSystemConfig
import scala.concurrent.duration.DurationInt
case class Action(side: String)
object PlaceTrade {
val actorSystem = ActorSystem("firstActorSystem")
println(actorSystem.name)
object TradeAction {
def props(action : String) = Props(new TradeAction(action))
}
class TradeAction(actorName: String) extends Actor {
override def receive: Receive = {
case "Buy" => {
val r = requests.get("http://www.google.com")
println("r status code is "+r.statusCode)
println("Buy")
println("")
}
case "Sell" => {
val r = requests.get("http://www.google.com")
println("r status code is "+r.statusCode)
println("Sell")
println("")
}
case _ =>
}
}
implicit val materializer = ActorMaterializer.create(ActorSystemConfig.getActorSystem)
def getThrottler(ac: ActorRef) = Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.throttle(1, 3.second)
.to(Sink.actorRef(ac, NotUsed))
.run()
def main(args: Array[String]): Unit = {
val tradeAction = actorSystem.actorOf(TradeAction.props("TradeAction"))
val throttler = getThrottler(tradeAction)
val l = List(Action("Buy"),Action("Buy"),Action("Buy"),Action("Sell"))
l.foreach(action => {
throttler ! action.side
})
}
}
您可以使用 delayWith
完成此操作,它允许(可能是有状态的)方法定义延迟此元素多长时间(无需重新排序元素),例如:
import akka.stream.scaladsl.{ DelayStrategy, DelayOverflowStrategy }
import scala.concurrent.duration.FiniteDuration
def decliningDelay(): DelayStrategy[Any] =
new DelayStrategy {
var nextDelaySeconds: Option[Int] = None
def nextDelay(elem: Any): FiniteDuration =
nextDelaySeconds match {
case None =>
nextDelaySeconds = 2
0.seconds
case Some(delay) if delay > 0 =>
nextDelaySeconds = delay - 1
delay.seconds
case _ => 0.seconds
}
}
def getThrottler(ac: ActorRef): ActorRef =
Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.delayWith(() => decliningDelay(), DelayOverflowStrategy.backpressure)
.to(Sink.actorRef(ac, NotUsed))
.run()
您可以将 delayWith
与 throttle
结合使用;我可能会把 delayWith
放在 throttle
之前。
我正在学习 Akka 并编写了以下代码来限制发送给 TradeAction
演员的消息数量。每秒最多发送 3 条消息。是否可以修改节流器以便设置每条消息之间的时间?例如message1和message2之间延迟2秒,message2和message3之间延迟1秒。
import akka.NotUsed
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Sink, Source}
import systemconfig.ActorSystemConfig
import scala.concurrent.duration.DurationInt
case class Action(side: String)
object PlaceTrade {
val actorSystem = ActorSystem("firstActorSystem")
println(actorSystem.name)
object TradeAction {
def props(action : String) = Props(new TradeAction(action))
}
class TradeAction(actorName: String) extends Actor {
override def receive: Receive = {
case "Buy" => {
val r = requests.get("http://www.google.com")
println("r status code is "+r.statusCode)
println("Buy")
println("")
}
case "Sell" => {
val r = requests.get("http://www.google.com")
println("r status code is "+r.statusCode)
println("Sell")
println("")
}
case _ =>
}
}
implicit val materializer = ActorMaterializer.create(ActorSystemConfig.getActorSystem)
def getThrottler(ac: ActorRef) = Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.throttle(1, 3.second)
.to(Sink.actorRef(ac, NotUsed))
.run()
def main(args: Array[String]): Unit = {
val tradeAction = actorSystem.actorOf(TradeAction.props("TradeAction"))
val throttler = getThrottler(tradeAction)
val l = List(Action("Buy"),Action("Buy"),Action("Buy"),Action("Sell"))
l.foreach(action => {
throttler ! action.side
})
}
}
您可以使用 delayWith
完成此操作,它允许(可能是有状态的)方法定义延迟此元素多长时间(无需重新排序元素),例如:
import akka.stream.scaladsl.{ DelayStrategy, DelayOverflowStrategy }
import scala.concurrent.duration.FiniteDuration
def decliningDelay(): DelayStrategy[Any] =
new DelayStrategy {
var nextDelaySeconds: Option[Int] = None
def nextDelay(elem: Any): FiniteDuration =
nextDelaySeconds match {
case None =>
nextDelaySeconds = 2
0.seconds
case Some(delay) if delay > 0 =>
nextDelaySeconds = delay - 1
delay.seconds
case _ => 0.seconds
}
}
def getThrottler(ac: ActorRef): ActorRef =
Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.delayWith(() => decliningDelay(), DelayOverflowStrategy.backpressure)
.to(Sink.actorRef(ac, NotUsed))
.run()
您可以将 delayWith
与 throttle
结合使用;我可能会把 delayWith
放在 throttle
之前。