我如何限制发送给 Akka 中的 IO(Tcp) actor 的消息
How can I throttle messages to the IO(Tcp) actor in Akka
我有这样的演员
class TcpClientActor(target: Target) extends Actor with Logger {
override def preStart(): Unit = {
self ! TestConnection
}
override def receive: Receive = {
case TestConnection =>
IO(Tcp) ! Connect(remoteAddress = new InetSocketAddress(target.endpoint, target.port), localAddress = None, options = Nil, timeout = Some(timeout), pullMode = false)
case failed@CommandFailed(_: Connect) =>
info(s"Failure: $target.endpoint:$target.port")
shutdown()
case Connected(_, _) =>
info(s"Success: $target.endpoint:$target.port")
sender() ! Close
shutdown()
}
def shutdown(): Unit = {
context stop self
}
}
我正在迭代一个文件,其中包含要测试的端点,并创建其中一个参与者,每一行都作为 Target
类型的构造函数参数。我希望能够将并行 TCP 连接的数量限制为某个设定的数量,我可以在 Akka 中使用内置机制来确保我不会通过立即创建一个 TcpClientActor
来使系统过载对于每一行输入并启动套接字连接?
我会使用 Akka Stream 来限制消息
import scala.concurrent.duration._
import akka.NotUsed
import akka.actor.ActorRef
import akka.stream.{ ActorMaterializer, OverflowStrategy, ThrottleMode }
import akka.stream.scaladsl.{ Sink, Source }
object TcpThrottle {
def throttler(ratePerSecond: Int, burstRate: Option[Int], bufferSize: Int = 1000)(implicit materializer: ActorMaterializer): ActorRef =
Source.actorRef(bufferSize = bufferSize, OverflowStrategy.dropNew)
.throttle(ratePerSecond, 1.second, burstRate.getOrElse(ratePerSecond), ThrottleMode.Shaping)
.to(Sink.actorRef(IO(Tcp), NotUsed)
.run()
}
class TcpClientActor(target: Target) extends Actor with Logger {
val throttler = TcpThrottle.throttler(1, Some(5))
// otherwise identical
def receive: Receive = {
case TestConnection => throttler ! Connect(remoteAddress = new InetSocketAddress(target.endpoint, target.port), localAddress = None, options = Nil, timeout = Some(timeout), pullMode = false)
// other cases identical
}
}
改编自The Akka 2.5 migration guide。可能有必要
我有这样的演员
class TcpClientActor(target: Target) extends Actor with Logger {
override def preStart(): Unit = {
self ! TestConnection
}
override def receive: Receive = {
case TestConnection =>
IO(Tcp) ! Connect(remoteAddress = new InetSocketAddress(target.endpoint, target.port), localAddress = None, options = Nil, timeout = Some(timeout), pullMode = false)
case failed@CommandFailed(_: Connect) =>
info(s"Failure: $target.endpoint:$target.port")
shutdown()
case Connected(_, _) =>
info(s"Success: $target.endpoint:$target.port")
sender() ! Close
shutdown()
}
def shutdown(): Unit = {
context stop self
}
}
我正在迭代一个文件,其中包含要测试的端点,并创建其中一个参与者,每一行都作为 Target
类型的构造函数参数。我希望能够将并行 TCP 连接的数量限制为某个设定的数量,我可以在 Akka 中使用内置机制来确保我不会通过立即创建一个 TcpClientActor
来使系统过载对于每一行输入并启动套接字连接?
我会使用 Akka Stream 来限制消息
import scala.concurrent.duration._
import akka.NotUsed
import akka.actor.ActorRef
import akka.stream.{ ActorMaterializer, OverflowStrategy, ThrottleMode }
import akka.stream.scaladsl.{ Sink, Source }
object TcpThrottle {
def throttler(ratePerSecond: Int, burstRate: Option[Int], bufferSize: Int = 1000)(implicit materializer: ActorMaterializer): ActorRef =
Source.actorRef(bufferSize = bufferSize, OverflowStrategy.dropNew)
.throttle(ratePerSecond, 1.second, burstRate.getOrElse(ratePerSecond), ThrottleMode.Shaping)
.to(Sink.actorRef(IO(Tcp), NotUsed)
.run()
}
class TcpClientActor(target: Target) extends Actor with Logger {
val throttler = TcpThrottle.throttler(1, Some(5))
// otherwise identical
def receive: Receive = {
case TestConnection => throttler ! Connect(remoteAddress = new InetSocketAddress(target.endpoint, target.port), localAddress = None, options = Nil, timeout = Some(timeout), pullMode = false)
// other cases identical
}
}
改编自The Akka 2.5 migration guide。可能有必要