具有 Akka 类型的 Actors 和集群分片的优先邮箱
Priority Mailbox with Akka Typed Actors and Cluster Sharding
我有一个带有类型角色的集群分片应用程序。演员长这样:
object TestActor {
sealed trait Command
final case class Inte(i: Int) extends Command
final case class Stringaki(s: String) extends Command
val TypeKey = EntityTypeKey[Command]("Test")
def defaultThreadBehavior(id: String): Behavior[Command] = Behaviors.setup { ctx =>
Behaviors.receiveMessage { cmd =>
cmd match {
case Inte(i) =>
ctx.log.info(System.currentTimeMillis()/1000 + " Received int: " + i)
Thread.sleep(1000)
case Stringaki(s) =>
ctx.log.info(System.currentTimeMillis()/1000 + " Received string: " + s)
Thread.sleep(1000)
}
Behaviors.same
}
}
}
演员是通过 Sharding Envelope 像这样创建的:
val system_config = ConfigFactory.parseString(
"""
|akka {
| actor {
| provider = "cluster"
| prio-dispatcher {
| type = "Dispatcher"
| mailbox-type = "PriorityMailbox"
| }
| }
| remote {
| netty.tcp {
| hostname = "127.0.0.1"
| port = 2551
| }
| }
| cluster {
| seed-nodes = [
| "akka.tcp://TestApp@127.0.0.1:2551"
| ]
| sharding {
| number-of-shards = 10
| use-dispatcher = "akka.actor.prio-dispatcher"
| }
| }
|}
|""".stripMargin)
val system = ActorSystem(Behaviors.empty[TestActor.Command], "TestApp",system_config)
val sharding = ClusterSharding(system)
val shardRegion = sharding.init(Entity(TestActor.TypeKey, ctx => defaultThreadBehavior(ctx.entityId)))
(0 to 9).foreach{
i =>
shardRegion ! ShardingEnvelope(0.toString, Inte(i))
}
(0 to 9).foreach{
i =>
shardRegion ! ShardingEnvelope(0.toString, Stringaki(i.toString))
}
两个 for 循环向同一个参与者发送消息。第一个循环发送整数,第二个循环发送字符串。当 actor 处理消息时,它会休眠以在队列中建立消息并测试优先级。优先邮箱在系统配置中配置,UnboundedPriorityMailbox 实现如下:
class PriorityMailbox (settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(
PriorityGenerator {
case Stringaki => 0
case _ => 1
}
)
为什么 Actor 按消息到达的顺序打印消息,而不考虑优先级生成器?
为什么您没有看到优先邮箱的效果,简短的回答是您的 TestActor
没有使用优先邮箱,而是默认邮箱。只有 Akka Cluster 分片系统在使用优先邮箱。 Cluster sharding reference.conf akka.cluster.sharding.use-dispatcher
的描述:
# The id of the dispatcher to use for ClusterSharding actors.
# If specified you need to define the settings of the actual dispatcher.
# This dispatcher for the entity actors is defined by the user provided
# Props, i.e. this dispatcher is not used for the entity actors.
确实,您发送的每条消息都会通过优先邮箱,但由于集群分片内部的参与者没有睡觉,因此不会出现积压(尽管在某些情况下,尤其是在内核较少的情况下,可能是优先级可以发挥作用的积压工作。
要让实体参与者 运行 在具有优先邮箱的调度程序中,您需要类似
val entityDispatcherProps = DispatcherSelector.fromConfig("akka.actor.prio-dispatcher")
val baseEntity = Entity(TestActor.TypeKey)(ctx => defaultThreadBehavior(ctx.entityId))
val shardRegion = sharding.init(baseEntity.withEntityProps(entityDispatcherProps))
我有一个带有类型角色的集群分片应用程序。演员长这样:
object TestActor {
sealed trait Command
final case class Inte(i: Int) extends Command
final case class Stringaki(s: String) extends Command
val TypeKey = EntityTypeKey[Command]("Test")
def defaultThreadBehavior(id: String): Behavior[Command] = Behaviors.setup { ctx =>
Behaviors.receiveMessage { cmd =>
cmd match {
case Inte(i) =>
ctx.log.info(System.currentTimeMillis()/1000 + " Received int: " + i)
Thread.sleep(1000)
case Stringaki(s) =>
ctx.log.info(System.currentTimeMillis()/1000 + " Received string: " + s)
Thread.sleep(1000)
}
Behaviors.same
}
}
}
演员是通过 Sharding Envelope 像这样创建的:
val system_config = ConfigFactory.parseString(
"""
|akka {
| actor {
| provider = "cluster"
| prio-dispatcher {
| type = "Dispatcher"
| mailbox-type = "PriorityMailbox"
| }
| }
| remote {
| netty.tcp {
| hostname = "127.0.0.1"
| port = 2551
| }
| }
| cluster {
| seed-nodes = [
| "akka.tcp://TestApp@127.0.0.1:2551"
| ]
| sharding {
| number-of-shards = 10
| use-dispatcher = "akka.actor.prio-dispatcher"
| }
| }
|}
|""".stripMargin)
val system = ActorSystem(Behaviors.empty[TestActor.Command], "TestApp",system_config)
val sharding = ClusterSharding(system)
val shardRegion = sharding.init(Entity(TestActor.TypeKey, ctx => defaultThreadBehavior(ctx.entityId)))
(0 to 9).foreach{
i =>
shardRegion ! ShardingEnvelope(0.toString, Inte(i))
}
(0 to 9).foreach{
i =>
shardRegion ! ShardingEnvelope(0.toString, Stringaki(i.toString))
}
两个 for 循环向同一个参与者发送消息。第一个循环发送整数,第二个循环发送字符串。当 actor 处理消息时,它会休眠以在队列中建立消息并测试优先级。优先邮箱在系统配置中配置,UnboundedPriorityMailbox 实现如下:
class PriorityMailbox (settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(
PriorityGenerator {
case Stringaki => 0
case _ => 1
}
)
为什么 Actor 按消息到达的顺序打印消息,而不考虑优先级生成器?
为什么您没有看到优先邮箱的效果,简短的回答是您的 TestActor
没有使用优先邮箱,而是默认邮箱。只有 Akka Cluster 分片系统在使用优先邮箱。 Cluster sharding reference.conf akka.cluster.sharding.use-dispatcher
的描述:
# The id of the dispatcher to use for ClusterSharding actors.
# If specified you need to define the settings of the actual dispatcher.
# This dispatcher for the entity actors is defined by the user provided
# Props, i.e. this dispatcher is not used for the entity actors.
确实,您发送的每条消息都会通过优先邮箱,但由于集群分片内部的参与者没有睡觉,因此不会出现积压(尽管在某些情况下,尤其是在内核较少的情况下,可能是优先级可以发挥作用的积压工作。
要让实体参与者 运行 在具有优先邮箱的调度程序中,您需要类似
val entityDispatcherProps = DispatcherSelector.fromConfig("akka.actor.prio-dispatcher")
val baseEntity = Entity(TestActor.TypeKey)(ctx => defaultThreadBehavior(ctx.entityId))
val shardRegion = sharding.init(baseEntity.withEntityProps(entityDispatcherProps))