当从 Akka-2.6.9 更新到 Akka 2.6.10 时,ShardCoordinator$LeastShardAllocationStrategy 出现 NullPointerException
NullPointerException in ShardCoordinator$LeastShardAllocationStrategy when updated to Akka 2.6.10 from Akka-2.6.9
我面临 java.lang.NullPointerException
,同时将 Akka 版本从 2.6.9 更新到 2.6.10。
这是我遇到此错误的示例代码:-
- akka-sharding/src/main/resources/application.conf
akka {
actor {
provider = "cluster"
}
remote.artery {
canonical {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:2551",
"akka://ClusterSystem@127.0.0.1:2552"]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}
- akka-sharding/src/main/scala/MyActorSharding.scala
import akka.actor.{ActorLogging, ActorRef, ActorSystem, PoisonPill, Props}
import akka.cluster.Cluster
import akka.cluster.sharding.ShardCoordinator.{LeastShardAllocationStrategy, ShardAllocationStrategy}
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import scala.concurrent.Future
import scala.util.Random
object MyActorSharding extends App {
class Actor extends akka.actor.Actor with ActorLogging {
private var index = 0
override def receive: Receive = {
case x: Int =>
index += x
case "print" =>
println("???????????????" + index)
println(self.path.toString)
}
}
class MyShardAllocationStrategy(shardAllocationStrategy: ShardAllocationStrategy)
extends ShardAllocationStrategy {
override def allocateShard(requester: ActorRef,
shardId: ShardId,
currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]]): Future[ActorRef] = {
println("Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..")
shardAllocationStrategy.allocateShard(requester, shardId, currentShardAllocations)
}
override def rebalance(currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]],
rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
println("Rebalance Shard ->>>>>>>>>>>>>>>>...")
shardAllocationStrategy.rebalance(currentShardAllocations, rebalanceInProgress)
}
}
val extractEntityId: PartialFunction[Any, (String, Any)] = {
case message =>
(
s"${message.toString} -> entityId -> ${new Random().nextInt(34)}",
message
)
}
val shardId: Any => String = x =>
s"${x.toString} -> shardId -> ${new Random().nextInt(34)}"
val actorProps: Props = Props[Actor]
val system = ActorSystem("ClusterSystem")
val cluster = Cluster(system)
val clusterSharding = ClusterSharding(system)
val actorRef = clusterSharding.start(
"MyActor",
actorProps,
ClusterShardingSettings(system),
extractEntityId,
shardId,
new MyShardAllocationStrategy(new LeastShardAllocationStrategy(10, 3)),
PoisonPill
)
actorRef ! 12
actorRef ! 13
actorRef ! 14
actorRef ! "print"
}
现在 Akka-2.6.9
akka-sharding/build.sbt
val akkaVersion = "2.6.9"
lazy val root = (project in file("."))
.settings(
name := "akka-sharding",
version := "0.1",
scalaVersion := "2.13.5",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion
)
)
我得到的输出为:-
[INFO] [05/04/2021 15:37:29.970] [main] [ArteryTcpTransport(akka://ClusterSystem)] Remoting started with transport [Artery tcp]; listening on address [akka://ClusterSystem@127.0.0.1:2551] with UID [3133807086855433188]
[INFO] [05/04/2021 15:37:29.990] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Starting up, Akka version [2.6.9] ...
[INFO] [05/04/2021 15:37:30.232] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [05/04/2021 15:37:30.233] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Started up successfully
[INFO] [05/04/2021 15:37:30.327] [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/system/cluster/core/daemon/downingProvider] SBR started. Config: stableAfter: 20000 ms, strategy: KeepMajority, selfUniqueAddress: UniqueAddress(akka://ClusterSystem@127.0.0.1:2551,3133807086855433188), selfDc: default
[INFO] [05/04/2021 15:37:30.756] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Idle entities will be passivated after [2.000 min]
[WARN] [05/04/2021 15:37:30.806] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.stream.Log(akka://ClusterSystem/system/Materializers/StreamSupervisor-1)] [outbound connection to [akka://ClusterSystem@127.0.0.1:2552], control stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1/<unresolved>:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[WARN] [05/04/2021 15:37:30.807] [ClusterSystem-akka.remote.default-remote-dispatcher-8] [akka.stream.Log(akka://ClusterSystem/system/Materializers/StreamSupervisor-1)] [outbound connection to [akka://ClusterSystem@127.0.0.1:2552], message stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1/<unresolved>:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[INFO] [akkaMemberChanged][05/04/2021 15:37:35.428] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Node [akka://ClusterSystem@127.0.0.1:2551] is JOINING itself (with roles [dc-default]) and forming new cluster
[INFO] [05/04/2021 15:37:35.430] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - is the new leader among reachable nodes (more leaders may exist)
[INFO] [akkaMemberChanged][05/04/2021 15:37:35.435] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Up]
[INFO] [05/04/2021 15:37:35.443] [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/system/cluster/core/daemon/downingProvider] This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
[INFO] [akkaClusterSingletonStarted][05/04/2021 15:37:35.449] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton manager starting singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton]
[INFO] [05/04/2021 15:37:35.451] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [Start -> Oldest]
[INFO] [05/04/2021 15:37:35.462] [ClusterSystem-akka.actor.default-dispatcher-11] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] ShardCoordinator was moved to the active state State(Map())
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
???????????????0
akka://ClusterSystem/system/sharding/MyActor/print+-%3E+shardId+-%3E+26/print+-%3E+entityId+-%3E+1
Rebalance Shard ->>>>>>>>>>>>>>>>...
[WARN] [05/04/2021 15:37:50.452] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [Association(akka://ClusterSystem)] Outbound control stream to [akka://ClusterSystem@127.0.0.1:2552] failed. Restarting it. akka.remote.artery.OutboundHandshake$HandshakeTimeoutException: Handshake with [akka://ClusterSystem@127.0.0.1:2552] did not complete within 20000 ms
Rebalance Shard ->>>>>>>>>>>>>>>>...
Rebalance Shard ->>>>>>>>>>>>>>>>...
Rebalance Shard ->>>>>>>>>>>>>>>>...
[INFO] [05/04/2021 15:38:18.737] [ClusterSystem-akka.actor.default-dispatcher-11] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] Starting rebalance for shards [print -> shardId -> 26,14 -> shardId -> 10,13 -> shardId -> 4,12 -> shardId -> 20]. Current shards rebalancing: []
[INFO] [akkaMemberChanged][05/04/2021 15:38:18.756] [ClusterSystem-akka.actor.internal-dispatcher-3] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Marked address [akka://ClusterSystem@127.0.0.1:2551] as [Leaving]
[INFO] [05/04/2021 15:38:18.759] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Exited [akka://ClusterSystem@127.0.0.1:2551]
[INFO] [05/04/2021 15:38:18.759] [ClusterSystem-akka.actor.internal-dispatcher-2] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Oldest observed OldestChanged: [akka://ClusterSystem@127.0.0.1:2551 -> None]
[INFO] [05/04/2021 15:38:18.760] [ClusterSystem-akka.actor.internal-dispatcher-2] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [Oldest -> WasOldest]
[INFO] [akkaMemberChanged][05/04/2021 15:38:19.101] [ClusterSystem-akka.actor.internal-dispatcher-6] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Exiting]
[INFO] [05/04/2021 15:38:19.771] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton manager stopping singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton]
[INFO] [05/04/2021 15:38:19.771] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [WasOldest -> Stopping]
[INFO] [akkaClusterSingletonTerminated][05/04/2021 15:38:19.773] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton] was terminated
[INFO] [05/04/2021 15:38:19.778] [ClusterSystem-akka.actor.internal-dispatcher-6] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Exiting completed
[INFO] [05/04/2021 15:38:19.779] [ClusterSystem-akka.actor.internal-dispatcher-6] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Shutting down...
[INFO] [05/04/2021 15:38:19.780] [ClusterSystem-akka.actor.internal-dispatcher-6] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Successfully shut down
[INFO] [akkaDeadLetter][05/04/2021 15:38:19.780] [ClusterSystem-akka.actor.default-dispatcher-26] [akka://ClusterSystem/system/clusterEventBusListener] Message [akka.cluster.ClusterEvent$SeenChanged] to Actor[akka://ClusterSystem/system/clusterEventBusListener#636946749] was unhandled. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [05/04/2021 15:38:19.785] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Shutting down remote daemon.
[INFO] [05/04/2021 15:38:19.785] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.
[INFO] [05/04/2021 15:38:19.799] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Remoting shut down.
以及 Akka-2.6.10
- akka-sharding/build.sbt
val akkaVersion = "2.6.10"
lazy val root = (project in file("."))
.settings(
name := "akka-sharding",
version := "0.1",
scalaVersion := "2.13.5",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion
)
)
我在 rebalance
和 allocateShard
时收到 java.lang.NullPointerException
[INFO] [05/04/2021 15:41:44.061] [main] [ArteryTcpTransport(akka://ClusterSystem)] Remoting started with transport [Artery tcp]; listening on address [akka://ClusterSystem@127.0.0.1:2551] with UID [-8668970757003113418]
[INFO] [05/04/2021 15:41:44.081] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Starting up, Akka version [2.6.10] ...
[INFO] [05/04/2021 15:41:44.198] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [05/04/2021 15:41:44.198] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Started up successfully
[INFO] [05/04/2021 15:41:44.246] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem/system/cluster/core/daemon/downingProvider] SBR started. Config: strategy [KeepMajority], stable-after [20 seconds], down-all-when-unstable [15 seconds], selfUniqueAddress [akka://ClusterSystem@127.0.0.1:2551#-8668970757003113418], selfDc [default].
[INFO] [05/04/2021 15:41:44.496] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Idle entities will be passivated after [2.000 min]
[WARN] [05/04/2021 15:41:44.541] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka.stream.Log(akka://ClusterSystem/system/Materializers/StreamSupervisor-1)] [outbound connection to [akka://ClusterSystem@127.0.0.1:2552], message stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1/<unresolved>:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[WARN] [05/04/2021 15:41:44.541] [ClusterSystem-akka.remote.default-remote-dispatcher-9] [akka.stream.Log(akka://ClusterSystem/system/Materializers/StreamSupervisor-1)] [outbound connection to [akka://ClusterSystem@127.0.0.1:2552], control stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1/<unresolved>:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[INFO] [akkaMemberChanged][05/04/2021 15:41:49.349] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Node [akka://ClusterSystem@127.0.0.1:2551] is JOINING itself (with roles [dc-default], version [0.0.0]) and forming new cluster
[INFO] [05/04/2021 15:41:49.351] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - is the new leader among reachable nodes (more leaders may exist)
[INFO] [akkaMemberChanged][05/04/2021 15:41:49.357] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Up]
[INFO] [05/04/2021 15:41:49.367] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/cluster/core/daemon/downingProvider] This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
[INFO] [akkaClusterSingletonStarted][05/04/2021 15:41:49.372] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton manager starting singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton]
[INFO] [05/04/2021 15:41:49.372] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [Start -> Oldest]
[INFO] [05/04/2021 15:41:49.384] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] MyActor: ShardCoordinator was moved to the active state State(Map())
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
[ERROR] [05/04/2021 15:41:49.655] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] null
java.lang.NullPointerException
at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.clusterState(AbstractLeastShardAllocationStrategy.scala:78)
at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.regionEntriesFor(AbstractLeastShardAllocationStrategy.scala:124)
at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.allocateShard(AbstractLeastShardAllocationStrategy.scala:85)
at MyActorSharding$MyShardAllocationStrategy.allocateShard(MyActorSharding.scala:31)
at akka.cluster.sharding.ShardCoordinator$$anonfun$active.applyOrElse(ShardCoordinator.scala:748)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:244)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.cluster.sharding.DDataShardCoordinator.akka$actor$Timers$$super$aroundReceive(ShardCoordinator.scala:1335)
at akka.actor.Timers.aroundReceive(Timers.scala:56)
at akka.actor.Timers.aroundReceive$(Timers.scala:41)
at akka.cluster.sharding.DDataShardCoordinator.aroundReceive(ShardCoordinator.scala:1335)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
[INFO] [akkaDeadLetter][05/04/2021 15:41:49.669] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] Message [akka.cluster.sharding.ShardCoordinator$Internal$GetShardHome] from Actor[akka://ClusterSystem/system/sharding/MyActor#-1153927922] to Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829] was unhandled. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][05/04/2021 15:41:49.669] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] Message [akka.cluster.sharding.ShardCoordinator$Internal$GetShardHome] from Actor[akka://ClusterSystem/system/sharding/MyActor#-1153927922] to Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829] was unhandled. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][05/04/2021 15:41:49.670] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] Message [akka.cluster.sharding.ShardCoordinator$Internal$GetShardHome] from Actor[akka://ClusterSystem/system/sharding/MyActor#-1153927922] to Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829] was unhandled. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [05/04/2021 15:41:50.171] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] MyActor: ShardCoordinator was moved to the active state State(Map())
[WARN] [05/04/2021 15:41:54.591] [ClusterSystem-akka.actor.internal-dispatcher-2] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Requested shard homes [12 -> shardId -> 7,13 -> shardId -> 33,14 -> shardId -> 13,print -> shardId -> 29] from coordinator at [Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829]]. [4] total buffered messages.
Rebalance Shard ->>>>>>>>>>>>>>>>...
[ERROR] [05/04/2021 15:41:59.672] [ClusterSystem-akka.actor.default-dispatcher-22] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] null
java.lang.NullPointerException
at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.clusterState(AbstractLeastShardAllocationStrategy.scala:78)
at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.regionEntriesFor(AbstractLeastShardAllocationStrategy.scala:124)
at akka.cluster.sharding.ShardCoordinator$LeastShardAllocationStrategy.rebalance(ShardCoordinator.scala:294)
at MyActorSharding$MyShardAllocationStrategy.rebalance(MyActorSharding.scala:37)
at akka.cluster.sharding.ShardCoordinator$$anonfun$active.applyOrElse(ShardCoordinator.scala:790)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:244)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.cluster.sharding.DDataShardCoordinator.akka$actor$Timers$$super$aroundReceive(ShardCoordinator.scala:1335)
at akka.actor.Timers.aroundReceive(Timers.scala:56)
at akka.actor.Timers.aroundReceive$(Timers.scala:41)
at akka.cluster.sharding.DDataShardCoordinator.aroundReceive(ShardCoordinator.scala:1335)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
...
[INFO] [05/04/2021 15:42:00.191] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] MyActor: ShardCoordinator was moved to the active state State(Map())
[WARN] [05/04/2021 15:42:04.352] [ClusterSystem-akka.remote.default-remote-dispatcher-9] [Association(akka://ClusterSystem)] Outbound control stream to [akka://ClusterSystem@127.0.0.1:2552] failed. Restarting it. akka.remote.artery.OutboundHandshake$HandshakeTimeoutException: Handshake with [akka://ClusterSystem@127.0.0.1:2552] did not complete within 20000 ms
[WARN] [05/04/2021 15:42:04.671] [ClusterSystem-akka.actor.internal-dispatcher-4] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Requested shard homes [12 -> shardId -> 7,13 -> shardId -> 33,14 -> shardId -> 13,print -> shardId -> 29] from coordinator at [Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829]]. [4] total buffered messages.
Rebalance Shard ->>>>>>>>>>>>>>>>...
[ERROR] [05/04/2021 15:42:09.692] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] null
java.lang.NullPointerException
at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.clusterState(AbstractLeastShardAllocationStrategy.scala:78)
at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.regionEntriesFor(AbstractLeastShardAllocationStrategy.scala:124)
at akka.cluster.sharding.ShardCoordinator$LeastShardAllocationStrategy.rebalance(ShardCoordinator.scala:294)
at MyActorSharding$MyShardAllocationStrategy.rebalance(MyActorSharding.scala:37)
at akka.cluster.sharding.ShardCoordinator$$anonfun$active.applyOrElse(ShardCoordinator.scala:790)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:244)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.cluster.sharding.DDataShardCoordinator.akka$actor$Timers$$super$aroundReceive(ShardCoordinator.scala:1335)
at akka.actor.Timers.aroundReceive(Timers.scala:56)
at akka.actor.Timers.aroundReceive$(Timers.scala:41)
at akka.cluster.sharding.DDataShardCoordinator.aroundReceive(ShardCoordinator.scala:1335)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
...
[INFO] [05/04/2021 15:42:10.211] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] MyActor: ShardCoordinator was moved to the active state State(Map())
[WARN] [05/04/2021 15:42:14.771] [ClusterSystem-akka.actor.internal-dispatcher-2] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Requested shard homes [12 -> shardId -> 7,13 -> shardId -> 33,14 -> shardId -> 13,print -> shardId -> 29] from coordinator at [Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829]]. [4] total buffered messages.
[WARN] [05/04/2021 15:42:15.521] [ClusterSystem-akka.actor.internal-dispatcher-2] [CoordinatedShutdown(akka://ClusterSystem)] Coordinated shutdown phase [cluster-sharding-shutdown-region] timed out after 10000 milliseconds
[INFO] [akkaMemberChanged][05/04/2021 15:42:15.524] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Marked address [akka://ClusterSystem@127.0.0.1:2551] as [Leaving]
[INFO] [05/04/2021 15:42:15.527] [ClusterSystem-akka.actor.internal-dispatcher-4] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Exited [akka://ClusterSystem@127.0.0.1:2551]
[INFO] [05/04/2021 15:42:15.528] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Oldest observed OldestChanged: [akka://ClusterSystem@127.0.0.1:2551 -> None]
[INFO] [05/04/2021 15:42:15.529] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [Oldest -> WasOldest]
[INFO] [akkaMemberChanged][05/04/2021 15:42:15.771] [ClusterSystem-akka.actor.internal-dispatcher-4] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Exiting]
[INFO] [05/04/2021 15:42:16.541] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton manager stopping singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton]
[INFO] [05/04/2021 15:42:16.541] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [WasOldest -> Stopping]
[INFO] [akkaClusterSingletonTerminated][05/04/2021 15:42:16.544] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton] was terminated
[WARN] [05/04/2021 15:42:16.544] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Trying to register to coordinator at [ActorSelection[Anchor(akka://ClusterSystem/), Path(/system/sharding/MyActorCoordinator/singleton/coordinator)]], but no acknowledgement. Total [4] buffered messages. [Coordinator [Member(akka://ClusterSystem@127.0.0.1:2551, Exiting)] is reachable.]
[INFO] [05/04/2021 15:42:16.548] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Exiting completed
[INFO] [05/04/2021 15:42:16.549] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Shutting down...
[INFO] [05/04/2021 15:42:16.550] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Successfully shut down
[INFO] [05/04/2021 15:42:16.557] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Shutting down remote daemon.
[INFO] [05/04/2021 15:42:16.558] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.
[INFO] [05/04/2021 15:42:16.573] [ClusterSystem-akka.remote.default-remote-dispatcher-9] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Remoting shut down.
谁能告诉我为什么会这样?
我知道 ShardAllocationStrategy 在 2.6.10 中从 2.6.9 更改了。
发行说明:- https://akka.io/blog/news/2020/10/09/akka-2.6.10-released
文档:- https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#shard-allocation
您可以使用 StartableAllocationStrategy
来自定义 MyShardAllocationStrategy
。此外,您需要将 shardAllocationStrategy
变量的类型更改为 LeastShardAllocationStrategy
.
完整代码供参考:
import akka.actor.{ActorLogging, ActorRef, ActorSystem, PoisonPill, Props}
import akka.cluster.Cluster
import akka.cluster.sharding.ShardCoordinator.{LeastShardAllocationStrategy, ShardAllocationStrategy, StartableAllocationStrategy}
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import scala.concurrent.Future
import scala.util.Random
object MyActorSharding extends App {
class Actor extends akka.actor.Actor with ActorLogging {
private var index = 0
override def receive: Receive = {
case x: Int =>
index += x
case "print" =>
println("???????????????" + index)
println(self.path.toString)
}
}
class MyShardAllocationStrategy(shardAllocationStrategy: LeastShardAllocationStrategy)
extends StartableAllocationStrategy {
override def allocateShard(requester: ActorRef,
shardId: ShardId,
currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]]): Future[ActorRef] = {
println("Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..")
shardAllocationStrategy.allocateShard(requester, shardId, currentShardAllocations)
}
override def rebalance(currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]],
rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
println("Rebalance Shard ->>>>>>>>>>>>>>>>...")
shardAllocationStrategy.rebalance(currentShardAllocations, rebalanceInProgress)
}
override def start(): Unit = {
shardAllocationStrategy.start(system)
}
}
val extractEntityId: PartialFunction[Any, (String, Any)] = {
case message =>
(
s"${message.toString} -> entityId -> ${new Random().nextInt(34)}",
message
)
}
val shardId: Any => String = x =>
s"${x.toString} -> shardId -> ${new Random().nextInt(34)}"
val actorProps: Props = Props[Actor]
val system = ActorSystem("ClusterSystem")
val cluster = Cluster(system)
val clusterSharding = ClusterSharding(system)
val actorRef = clusterSharding.start(
"MyActor",
actorProps,
ClusterShardingSettings(system),
extractEntityId,
shardId,
new MyShardAllocationStrategy(new LeastShardAllocationStrategy(1, 1)),
PoisonPill
)
actorRef ! 12
actorRef ! 13
actorRef ! 14
actorRef ! "print"
}
我面临 java.lang.NullPointerException
,同时将 Akka 版本从 2.6.9 更新到 2.6.10。
这是我遇到此错误的示例代码:-
- akka-sharding/src/main/resources/application.conf
akka {
actor {
provider = "cluster"
}
remote.artery {
canonical {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:2551",
"akka://ClusterSystem@127.0.0.1:2552"]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}
- akka-sharding/src/main/scala/MyActorSharding.scala
import akka.actor.{ActorLogging, ActorRef, ActorSystem, PoisonPill, Props}
import akka.cluster.Cluster
import akka.cluster.sharding.ShardCoordinator.{LeastShardAllocationStrategy, ShardAllocationStrategy}
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import scala.concurrent.Future
import scala.util.Random
object MyActorSharding extends App {
class Actor extends akka.actor.Actor with ActorLogging {
private var index = 0
override def receive: Receive = {
case x: Int =>
index += x
case "print" =>
println("???????????????" + index)
println(self.path.toString)
}
}
class MyShardAllocationStrategy(shardAllocationStrategy: ShardAllocationStrategy)
extends ShardAllocationStrategy {
override def allocateShard(requester: ActorRef,
shardId: ShardId,
currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]]): Future[ActorRef] = {
println("Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..")
shardAllocationStrategy.allocateShard(requester, shardId, currentShardAllocations)
}
override def rebalance(currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]],
rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
println("Rebalance Shard ->>>>>>>>>>>>>>>>...")
shardAllocationStrategy.rebalance(currentShardAllocations, rebalanceInProgress)
}
}
val extractEntityId: PartialFunction[Any, (String, Any)] = {
case message =>
(
s"${message.toString} -> entityId -> ${new Random().nextInt(34)}",
message
)
}
val shardId: Any => String = x =>
s"${x.toString} -> shardId -> ${new Random().nextInt(34)}"
val actorProps: Props = Props[Actor]
val system = ActorSystem("ClusterSystem")
val cluster = Cluster(system)
val clusterSharding = ClusterSharding(system)
val actorRef = clusterSharding.start(
"MyActor",
actorProps,
ClusterShardingSettings(system),
extractEntityId,
shardId,
new MyShardAllocationStrategy(new LeastShardAllocationStrategy(10, 3)),
PoisonPill
)
actorRef ! 12
actorRef ! 13
actorRef ! 14
actorRef ! "print"
}
现在 Akka-2.6.9
akka-sharding/build.sbt
val akkaVersion = "2.6.9"
lazy val root = (project in file("."))
.settings(
name := "akka-sharding",
version := "0.1",
scalaVersion := "2.13.5",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion
)
)
我得到的输出为:-
[INFO] [05/04/2021 15:37:29.970] [main] [ArteryTcpTransport(akka://ClusterSystem)] Remoting started with transport [Artery tcp]; listening on address [akka://ClusterSystem@127.0.0.1:2551] with UID [3133807086855433188]
[INFO] [05/04/2021 15:37:29.990] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Starting up, Akka version [2.6.9] ...
[INFO] [05/04/2021 15:37:30.232] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [05/04/2021 15:37:30.233] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Started up successfully
[INFO] [05/04/2021 15:37:30.327] [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/system/cluster/core/daemon/downingProvider] SBR started. Config: stableAfter: 20000 ms, strategy: KeepMajority, selfUniqueAddress: UniqueAddress(akka://ClusterSystem@127.0.0.1:2551,3133807086855433188), selfDc: default
[INFO] [05/04/2021 15:37:30.756] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Idle entities will be passivated after [2.000 min]
[WARN] [05/04/2021 15:37:30.806] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.stream.Log(akka://ClusterSystem/system/Materializers/StreamSupervisor-1)] [outbound connection to [akka://ClusterSystem@127.0.0.1:2552], control stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1/<unresolved>:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[WARN] [05/04/2021 15:37:30.807] [ClusterSystem-akka.remote.default-remote-dispatcher-8] [akka.stream.Log(akka://ClusterSystem/system/Materializers/StreamSupervisor-1)] [outbound connection to [akka://ClusterSystem@127.0.0.1:2552], message stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1/<unresolved>:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[INFO] [akkaMemberChanged][05/04/2021 15:37:35.428] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Node [akka://ClusterSystem@127.0.0.1:2551] is JOINING itself (with roles [dc-default]) and forming new cluster
[INFO] [05/04/2021 15:37:35.430] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - is the new leader among reachable nodes (more leaders may exist)
[INFO] [akkaMemberChanged][05/04/2021 15:37:35.435] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Up]
[INFO] [05/04/2021 15:37:35.443] [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/system/cluster/core/daemon/downingProvider] This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
[INFO] [akkaClusterSingletonStarted][05/04/2021 15:37:35.449] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton manager starting singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton]
[INFO] [05/04/2021 15:37:35.451] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [Start -> Oldest]
[INFO] [05/04/2021 15:37:35.462] [ClusterSystem-akka.actor.default-dispatcher-11] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] ShardCoordinator was moved to the active state State(Map())
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
???????????????0
akka://ClusterSystem/system/sharding/MyActor/print+-%3E+shardId+-%3E+26/print+-%3E+entityId+-%3E+1
Rebalance Shard ->>>>>>>>>>>>>>>>...
[WARN] [05/04/2021 15:37:50.452] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [Association(akka://ClusterSystem)] Outbound control stream to [akka://ClusterSystem@127.0.0.1:2552] failed. Restarting it. akka.remote.artery.OutboundHandshake$HandshakeTimeoutException: Handshake with [akka://ClusterSystem@127.0.0.1:2552] did not complete within 20000 ms
Rebalance Shard ->>>>>>>>>>>>>>>>...
Rebalance Shard ->>>>>>>>>>>>>>>>...
Rebalance Shard ->>>>>>>>>>>>>>>>...
[INFO] [05/04/2021 15:38:18.737] [ClusterSystem-akka.actor.default-dispatcher-11] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] Starting rebalance for shards [print -> shardId -> 26,14 -> shardId -> 10,13 -> shardId -> 4,12 -> shardId -> 20]. Current shards rebalancing: []
[INFO] [akkaMemberChanged][05/04/2021 15:38:18.756] [ClusterSystem-akka.actor.internal-dispatcher-3] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Marked address [akka://ClusterSystem@127.0.0.1:2551] as [Leaving]
[INFO] [05/04/2021 15:38:18.759] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Exited [akka://ClusterSystem@127.0.0.1:2551]
[INFO] [05/04/2021 15:38:18.759] [ClusterSystem-akka.actor.internal-dispatcher-2] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Oldest observed OldestChanged: [akka://ClusterSystem@127.0.0.1:2551 -> None]
[INFO] [05/04/2021 15:38:18.760] [ClusterSystem-akka.actor.internal-dispatcher-2] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [Oldest -> WasOldest]
[INFO] [akkaMemberChanged][05/04/2021 15:38:19.101] [ClusterSystem-akka.actor.internal-dispatcher-6] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Exiting]
[INFO] [05/04/2021 15:38:19.771] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton manager stopping singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton]
[INFO] [05/04/2021 15:38:19.771] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [WasOldest -> Stopping]
[INFO] [akkaClusterSingletonTerminated][05/04/2021 15:38:19.773] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton] was terminated
[INFO] [05/04/2021 15:38:19.778] [ClusterSystem-akka.actor.internal-dispatcher-6] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Exiting completed
[INFO] [05/04/2021 15:38:19.779] [ClusterSystem-akka.actor.internal-dispatcher-6] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Shutting down...
[INFO] [05/04/2021 15:38:19.780] [ClusterSystem-akka.actor.internal-dispatcher-6] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Successfully shut down
[INFO] [akkaDeadLetter][05/04/2021 15:38:19.780] [ClusterSystem-akka.actor.default-dispatcher-26] [akka://ClusterSystem/system/clusterEventBusListener] Message [akka.cluster.ClusterEvent$SeenChanged] to Actor[akka://ClusterSystem/system/clusterEventBusListener#636946749] was unhandled. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [05/04/2021 15:38:19.785] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Shutting down remote daemon.
[INFO] [05/04/2021 15:38:19.785] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.
[INFO] [05/04/2021 15:38:19.799] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Remoting shut down.
以及 Akka-2.6.10
- akka-sharding/build.sbt
val akkaVersion = "2.6.10"
lazy val root = (project in file("."))
.settings(
name := "akka-sharding",
version := "0.1",
scalaVersion := "2.13.5",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion
)
)
我在 rebalance
和 allocateShard
[INFO] [05/04/2021 15:41:44.061] [main] [ArteryTcpTransport(akka://ClusterSystem)] Remoting started with transport [Artery tcp]; listening on address [akka://ClusterSystem@127.0.0.1:2551] with UID [-8668970757003113418]
[INFO] [05/04/2021 15:41:44.081] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Starting up, Akka version [2.6.10] ...
[INFO] [05/04/2021 15:41:44.198] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [05/04/2021 15:41:44.198] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Started up successfully
[INFO] [05/04/2021 15:41:44.246] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem/system/cluster/core/daemon/downingProvider] SBR started. Config: strategy [KeepMajority], stable-after [20 seconds], down-all-when-unstable [15 seconds], selfUniqueAddress [akka://ClusterSystem@127.0.0.1:2551#-8668970757003113418], selfDc [default].
[INFO] [05/04/2021 15:41:44.496] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Idle entities will be passivated after [2.000 min]
[WARN] [05/04/2021 15:41:44.541] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka.stream.Log(akka://ClusterSystem/system/Materializers/StreamSupervisor-1)] [outbound connection to [akka://ClusterSystem@127.0.0.1:2552], message stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1/<unresolved>:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[WARN] [05/04/2021 15:41:44.541] [ClusterSystem-akka.remote.default-remote-dispatcher-9] [akka.stream.Log(akka://ClusterSystem/system/Materializers/StreamSupervisor-1)] [outbound connection to [akka://ClusterSystem@127.0.0.1:2552], control stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1/<unresolved>:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[INFO] [akkaMemberChanged][05/04/2021 15:41:49.349] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Node [akka://ClusterSystem@127.0.0.1:2551] is JOINING itself (with roles [dc-default], version [0.0.0]) and forming new cluster
[INFO] [05/04/2021 15:41:49.351] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - is the new leader among reachable nodes (more leaders may exist)
[INFO] [akkaMemberChanged][05/04/2021 15:41:49.357] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Up]
[INFO] [05/04/2021 15:41:49.367] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/cluster/core/daemon/downingProvider] This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
[INFO] [akkaClusterSingletonStarted][05/04/2021 15:41:49.372] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton manager starting singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton]
[INFO] [05/04/2021 15:41:49.372] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [Start -> Oldest]
[INFO] [05/04/2021 15:41:49.384] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] MyActor: ShardCoordinator was moved to the active state State(Map())
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
[ERROR] [05/04/2021 15:41:49.655] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] null
java.lang.NullPointerException
at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.clusterState(AbstractLeastShardAllocationStrategy.scala:78)
at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.regionEntriesFor(AbstractLeastShardAllocationStrategy.scala:124)
at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.allocateShard(AbstractLeastShardAllocationStrategy.scala:85)
at MyActorSharding$MyShardAllocationStrategy.allocateShard(MyActorSharding.scala:31)
at akka.cluster.sharding.ShardCoordinator$$anonfun$active.applyOrElse(ShardCoordinator.scala:748)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:244)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.cluster.sharding.DDataShardCoordinator.akka$actor$Timers$$super$aroundReceive(ShardCoordinator.scala:1335)
at akka.actor.Timers.aroundReceive(Timers.scala:56)
at akka.actor.Timers.aroundReceive$(Timers.scala:41)
at akka.cluster.sharding.DDataShardCoordinator.aroundReceive(ShardCoordinator.scala:1335)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
[INFO] [akkaDeadLetter][05/04/2021 15:41:49.669] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] Message [akka.cluster.sharding.ShardCoordinator$Internal$GetShardHome] from Actor[akka://ClusterSystem/system/sharding/MyActor#-1153927922] to Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829] was unhandled. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][05/04/2021 15:41:49.669] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] Message [akka.cluster.sharding.ShardCoordinator$Internal$GetShardHome] from Actor[akka://ClusterSystem/system/sharding/MyActor#-1153927922] to Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829] was unhandled. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][05/04/2021 15:41:49.670] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] Message [akka.cluster.sharding.ShardCoordinator$Internal$GetShardHome] from Actor[akka://ClusterSystem/system/sharding/MyActor#-1153927922] to Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829] was unhandled. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [05/04/2021 15:41:50.171] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] MyActor: ShardCoordinator was moved to the active state State(Map())
[WARN] [05/04/2021 15:41:54.591] [ClusterSystem-akka.actor.internal-dispatcher-2] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Requested shard homes [12 -> shardId -> 7,13 -> shardId -> 33,14 -> shardId -> 13,print -> shardId -> 29] from coordinator at [Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829]]. [4] total buffered messages.
Rebalance Shard ->>>>>>>>>>>>>>>>...
[ERROR] [05/04/2021 15:41:59.672] [ClusterSystem-akka.actor.default-dispatcher-22] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] null
java.lang.NullPointerException
at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.clusterState(AbstractLeastShardAllocationStrategy.scala:78)
at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.regionEntriesFor(AbstractLeastShardAllocationStrategy.scala:124)
at akka.cluster.sharding.ShardCoordinator$LeastShardAllocationStrategy.rebalance(ShardCoordinator.scala:294)
at MyActorSharding$MyShardAllocationStrategy.rebalance(MyActorSharding.scala:37)
at akka.cluster.sharding.ShardCoordinator$$anonfun$active.applyOrElse(ShardCoordinator.scala:790)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:244)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.cluster.sharding.DDataShardCoordinator.akka$actor$Timers$$super$aroundReceive(ShardCoordinator.scala:1335)
at akka.actor.Timers.aroundReceive(Timers.scala:56)
at akka.actor.Timers.aroundReceive$(Timers.scala:41)
at akka.cluster.sharding.DDataShardCoordinator.aroundReceive(ShardCoordinator.scala:1335)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
...
[INFO] [05/04/2021 15:42:00.191] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] MyActor: ShardCoordinator was moved to the active state State(Map())
[WARN] [05/04/2021 15:42:04.352] [ClusterSystem-akka.remote.default-remote-dispatcher-9] [Association(akka://ClusterSystem)] Outbound control stream to [akka://ClusterSystem@127.0.0.1:2552] failed. Restarting it. akka.remote.artery.OutboundHandshake$HandshakeTimeoutException: Handshake with [akka://ClusterSystem@127.0.0.1:2552] did not complete within 20000 ms
[WARN] [05/04/2021 15:42:04.671] [ClusterSystem-akka.actor.internal-dispatcher-4] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Requested shard homes [12 -> shardId -> 7,13 -> shardId -> 33,14 -> shardId -> 13,print -> shardId -> 29] from coordinator at [Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829]]. [4] total buffered messages.
Rebalance Shard ->>>>>>>>>>>>>>>>...
[ERROR] [05/04/2021 15:42:09.692] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] null
java.lang.NullPointerException
at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.clusterState(AbstractLeastShardAllocationStrategy.scala:78)
at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.regionEntriesFor(AbstractLeastShardAllocationStrategy.scala:124)
at akka.cluster.sharding.ShardCoordinator$LeastShardAllocationStrategy.rebalance(ShardCoordinator.scala:294)
at MyActorSharding$MyShardAllocationStrategy.rebalance(MyActorSharding.scala:37)
at akka.cluster.sharding.ShardCoordinator$$anonfun$active.applyOrElse(ShardCoordinator.scala:790)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:244)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.cluster.sharding.DDataShardCoordinator.akka$actor$Timers$$super$aroundReceive(ShardCoordinator.scala:1335)
at akka.actor.Timers.aroundReceive(Timers.scala:56)
at akka.actor.Timers.aroundReceive$(Timers.scala:41)
at akka.cluster.sharding.DDataShardCoordinator.aroundReceive(ShardCoordinator.scala:1335)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
...
[INFO] [05/04/2021 15:42:10.211] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] MyActor: ShardCoordinator was moved to the active state State(Map())
[WARN] [05/04/2021 15:42:14.771] [ClusterSystem-akka.actor.internal-dispatcher-2] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Requested shard homes [12 -> shardId -> 7,13 -> shardId -> 33,14 -> shardId -> 13,print -> shardId -> 29] from coordinator at [Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829]]. [4] total buffered messages.
[WARN] [05/04/2021 15:42:15.521] [ClusterSystem-akka.actor.internal-dispatcher-2] [CoordinatedShutdown(akka://ClusterSystem)] Coordinated shutdown phase [cluster-sharding-shutdown-region] timed out after 10000 milliseconds
[INFO] [akkaMemberChanged][05/04/2021 15:42:15.524] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Marked address [akka://ClusterSystem@127.0.0.1:2551] as [Leaving]
[INFO] [05/04/2021 15:42:15.527] [ClusterSystem-akka.actor.internal-dispatcher-4] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Exited [akka://ClusterSystem@127.0.0.1:2551]
[INFO] [05/04/2021 15:42:15.528] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Oldest observed OldestChanged: [akka://ClusterSystem@127.0.0.1:2551 -> None]
[INFO] [05/04/2021 15:42:15.529] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [Oldest -> WasOldest]
[INFO] [akkaMemberChanged][05/04/2021 15:42:15.771] [ClusterSystem-akka.actor.internal-dispatcher-4] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Exiting]
[INFO] [05/04/2021 15:42:16.541] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton manager stopping singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton]
[INFO] [05/04/2021 15:42:16.541] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [WasOldest -> Stopping]
[INFO] [akkaClusterSingletonTerminated][05/04/2021 15:42:16.544] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton] was terminated
[WARN] [05/04/2021 15:42:16.544] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Trying to register to coordinator at [ActorSelection[Anchor(akka://ClusterSystem/), Path(/system/sharding/MyActorCoordinator/singleton/coordinator)]], but no acknowledgement. Total [4] buffered messages. [Coordinator [Member(akka://ClusterSystem@127.0.0.1:2551, Exiting)] is reachable.]
[INFO] [05/04/2021 15:42:16.548] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Exiting completed
[INFO] [05/04/2021 15:42:16.549] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Shutting down...
[INFO] [05/04/2021 15:42:16.550] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Successfully shut down
[INFO] [05/04/2021 15:42:16.557] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Shutting down remote daemon.
[INFO] [05/04/2021 15:42:16.558] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.
[INFO] [05/04/2021 15:42:16.573] [ClusterSystem-akka.remote.default-remote-dispatcher-9] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Remoting shut down.
谁能告诉我为什么会这样?
我知道 ShardAllocationStrategy 在 2.6.10 中从 2.6.9 更改了。
发行说明:- https://akka.io/blog/news/2020/10/09/akka-2.6.10-released 文档:- https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#shard-allocation
您可以使用 StartableAllocationStrategy
来自定义 MyShardAllocationStrategy
。此外,您需要将 shardAllocationStrategy
变量的类型更改为 LeastShardAllocationStrategy
.
完整代码供参考:
import akka.actor.{ActorLogging, ActorRef, ActorSystem, PoisonPill, Props}
import akka.cluster.Cluster
import akka.cluster.sharding.ShardCoordinator.{LeastShardAllocationStrategy, ShardAllocationStrategy, StartableAllocationStrategy}
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import scala.concurrent.Future
import scala.util.Random
object MyActorSharding extends App {
class Actor extends akka.actor.Actor with ActorLogging {
private var index = 0
override def receive: Receive = {
case x: Int =>
index += x
case "print" =>
println("???????????????" + index)
println(self.path.toString)
}
}
class MyShardAllocationStrategy(shardAllocationStrategy: LeastShardAllocationStrategy)
extends StartableAllocationStrategy {
override def allocateShard(requester: ActorRef,
shardId: ShardId,
currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]]): Future[ActorRef] = {
println("Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..")
shardAllocationStrategy.allocateShard(requester, shardId, currentShardAllocations)
}
override def rebalance(currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]],
rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
println("Rebalance Shard ->>>>>>>>>>>>>>>>...")
shardAllocationStrategy.rebalance(currentShardAllocations, rebalanceInProgress)
}
override def start(): Unit = {
shardAllocationStrategy.start(system)
}
}
val extractEntityId: PartialFunction[Any, (String, Any)] = {
case message =>
(
s"${message.toString} -> entityId -> ${new Random().nextInt(34)}",
message
)
}
val shardId: Any => String = x =>
s"${x.toString} -> shardId -> ${new Random().nextInt(34)}"
val actorProps: Props = Props[Actor]
val system = ActorSystem("ClusterSystem")
val cluster = Cluster(system)
val clusterSharding = ClusterSharding(system)
val actorRef = clusterSharding.start(
"MyActor",
actorProps,
ClusterShardingSettings(system),
extractEntityId,
shardId,
new MyShardAllocationStrategy(new LeastShardAllocationStrategy(1, 1)),
PoisonPill
)
actorRef ! 12
actorRef ! 13
actorRef ! 14
actorRef ! "print"
}