当从 Akka-2.6.9 更新到 Akka 2.6.10 时,ShardCoordinator$LeastShardAllocationStrategy 出现 NullPointerException

NullPointerException in ShardCoordinator$LeastShardAllocationStrategy when updated to Akka 2.6.10 from Akka-2.6.9

我面临 java.lang.NullPointerException,同时将 Akka 版本从 2.6.9 更新到 2.6.10

这是我遇到此错误的示例代码:-

  1. akka-sharding/src/main/resources/application.conf
akka {
  actor {
    provider = "cluster"
  }
  remote.artery {
    canonical {
      hostname = "127.0.0.1"
      port = 2551
    }
  }
  cluster {
    seed-nodes = [
      "akka://ClusterSystem@127.0.0.1:2551",
      "akka://ClusterSystem@127.0.0.1:2552"]
    downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
  }
}
  1. akka-sharding/src/main/scala/MyActorSharding.scala
import akka.actor.{ActorLogging, ActorRef, ActorSystem, PoisonPill, Props}
import akka.cluster.Cluster
import akka.cluster.sharding.ShardCoordinator.{LeastShardAllocationStrategy, ShardAllocationStrategy}
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import scala.concurrent.Future
import scala.util.Random

object MyActorSharding extends App {

  class Actor extends akka.actor.Actor with ActorLogging {

    private var index = 0

    override def receive: Receive = {
      case x: Int =>
        index += x
      case "print" =>
        println("???????????????" + index)
        println(self.path.toString)
    }
  }

  class MyShardAllocationStrategy(shardAllocationStrategy: ShardAllocationStrategy)
  extends ShardAllocationStrategy {
    override def allocateShard(requester: ActorRef,
                               shardId: ShardId,
                               currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]]): Future[ActorRef] = {
      println("Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..")
      shardAllocationStrategy.allocateShard(requester, shardId, currentShardAllocations)
    }

    override def rebalance(currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]],
                           rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
      println("Rebalance Shard ->>>>>>>>>>>>>>>>...")
      shardAllocationStrategy.rebalance(currentShardAllocations, rebalanceInProgress)
    }
  }

  val extractEntityId: PartialFunction[Any, (String, Any)] = {
    case message =>
      (
        s"${message.toString} -> entityId -> ${new Random().nextInt(34)}",
        message
      )
  }

  val shardId: Any => String = x =>
    s"${x.toString} -> shardId -> ${new Random().nextInt(34)}"

  val actorProps: Props = Props[Actor]

  val system = ActorSystem("ClusterSystem")
  val cluster = Cluster(system)
  val clusterSharding = ClusterSharding(system)
  val actorRef = clusterSharding.start(
    "MyActor",
    actorProps,
    ClusterShardingSettings(system),
    extractEntityId,
    shardId,
    new MyShardAllocationStrategy(new LeastShardAllocationStrategy(10, 3)),
    PoisonPill
  )

  actorRef ! 12
  actorRef ! 13
  actorRef ! 14
  actorRef ! "print"

}

现在 Akka-2.6.9

akka-sharding/build.sbt

val akkaVersion = "2.6.9"

lazy val root = (project in file("."))
  .settings(
    name := "akka-sharding",
    version := "0.1",
    scalaVersion := "2.13.5",
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-cluster" % akkaVersion,
      "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion
    )
  )

我得到的输出为:-

[INFO] [05/04/2021 15:37:29.970] [main] [ArteryTcpTransport(akka://ClusterSystem)] Remoting started with transport [Artery tcp]; listening on address [akka://ClusterSystem@127.0.0.1:2551] with UID [3133807086855433188]
[INFO] [05/04/2021 15:37:29.990] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Starting up, Akka version [2.6.9] ...
[INFO] [05/04/2021 15:37:30.232] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [05/04/2021 15:37:30.233] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Started up successfully
[INFO] [05/04/2021 15:37:30.327] [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/system/cluster/core/daemon/downingProvider] SBR started. Config: stableAfter: 20000 ms, strategy: KeepMajority, selfUniqueAddress: UniqueAddress(akka://ClusterSystem@127.0.0.1:2551,3133807086855433188), selfDc: default
[INFO] [05/04/2021 15:37:30.756] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Idle entities will be passivated after [2.000 min]
[WARN] [05/04/2021 15:37:30.806] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.stream.Log(akka://ClusterSystem/system/Materializers/StreamSupervisor-1)] [outbound connection to [akka://ClusterSystem@127.0.0.1:2552], control stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1/<unresolved>:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[WARN] [05/04/2021 15:37:30.807] [ClusterSystem-akka.remote.default-remote-dispatcher-8] [akka.stream.Log(akka://ClusterSystem/system/Materializers/StreamSupervisor-1)] [outbound connection to [akka://ClusterSystem@127.0.0.1:2552], message stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1/<unresolved>:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[INFO] [akkaMemberChanged][05/04/2021 15:37:35.428] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Node [akka://ClusterSystem@127.0.0.1:2551] is JOINING itself (with roles [dc-default]) and forming new cluster
[INFO] [05/04/2021 15:37:35.430] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - is the new leader among reachable nodes (more leaders may exist)
[INFO] [akkaMemberChanged][05/04/2021 15:37:35.435] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Up]
[INFO] [05/04/2021 15:37:35.443] [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/system/cluster/core/daemon/downingProvider] This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
[INFO] [akkaClusterSingletonStarted][05/04/2021 15:37:35.449] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton manager starting singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton]
[INFO] [05/04/2021 15:37:35.451] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [Start -> Oldest]
[INFO] [05/04/2021 15:37:35.462] [ClusterSystem-akka.actor.default-dispatcher-11] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] ShardCoordinator was moved to the active state State(Map())
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
???????????????0
akka://ClusterSystem/system/sharding/MyActor/print+-%3E+shardId+-%3E+26/print+-%3E+entityId+-%3E+1
Rebalance Shard ->>>>>>>>>>>>>>>>...
[WARN] [05/04/2021 15:37:50.452] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [Association(akka://ClusterSystem)] Outbound control stream to [akka://ClusterSystem@127.0.0.1:2552] failed. Restarting it. akka.remote.artery.OutboundHandshake$HandshakeTimeoutException: Handshake with [akka://ClusterSystem@127.0.0.1:2552] did not complete within 20000 ms
Rebalance Shard ->>>>>>>>>>>>>>>>...
Rebalance Shard ->>>>>>>>>>>>>>>>...
Rebalance Shard ->>>>>>>>>>>>>>>>...
[INFO] [05/04/2021 15:38:18.737] [ClusterSystem-akka.actor.default-dispatcher-11] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] Starting rebalance for shards [print -> shardId -> 26,14 -> shardId -> 10,13 -> shardId -> 4,12 -> shardId -> 20]. Current shards rebalancing: []
[INFO] [akkaMemberChanged][05/04/2021 15:38:18.756] [ClusterSystem-akka.actor.internal-dispatcher-3] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Marked address [akka://ClusterSystem@127.0.0.1:2551] as [Leaving]
[INFO] [05/04/2021 15:38:18.759] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Exited [akka://ClusterSystem@127.0.0.1:2551]
[INFO] [05/04/2021 15:38:18.759] [ClusterSystem-akka.actor.internal-dispatcher-2] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Oldest observed OldestChanged: [akka://ClusterSystem@127.0.0.1:2551 -> None]
[INFO] [05/04/2021 15:38:18.760] [ClusterSystem-akka.actor.internal-dispatcher-2] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [Oldest -> WasOldest]
[INFO] [akkaMemberChanged][05/04/2021 15:38:19.101] [ClusterSystem-akka.actor.internal-dispatcher-6] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Exiting]
[INFO] [05/04/2021 15:38:19.771] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton manager stopping singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton]
[INFO] [05/04/2021 15:38:19.771] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [WasOldest -> Stopping]
[INFO] [akkaClusterSingletonTerminated][05/04/2021 15:38:19.773] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton] was terminated
[INFO] [05/04/2021 15:38:19.778] [ClusterSystem-akka.actor.internal-dispatcher-6] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Exiting completed
[INFO] [05/04/2021 15:38:19.779] [ClusterSystem-akka.actor.internal-dispatcher-6] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Shutting down...
[INFO] [05/04/2021 15:38:19.780] [ClusterSystem-akka.actor.internal-dispatcher-6] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Successfully shut down
[INFO] [akkaDeadLetter][05/04/2021 15:38:19.780] [ClusterSystem-akka.actor.default-dispatcher-26] [akka://ClusterSystem/system/clusterEventBusListener] Message [akka.cluster.ClusterEvent$SeenChanged] to Actor[akka://ClusterSystem/system/clusterEventBusListener#636946749] was unhandled. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [05/04/2021 15:38:19.785] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Shutting down remote daemon.
[INFO] [05/04/2021 15:38:19.785] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.
[INFO] [05/04/2021 15:38:19.799] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Remoting shut down.

以及 Akka-2.6.10

  1. akka-sharding/build.sbt
val akkaVersion = "2.6.10"

lazy val root = (project in file("."))
  .settings(
    name := "akka-sharding",
    version := "0.1",
    scalaVersion := "2.13.5",
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-cluster" % akkaVersion,
      "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion
    )
  )

我在 rebalanceallocateShard

时收到 java.lang.NullPointerException
[INFO] [05/04/2021 15:41:44.061] [main] [ArteryTcpTransport(akka://ClusterSystem)] Remoting started with transport [Artery tcp]; listening on address [akka://ClusterSystem@127.0.0.1:2551] with UID [-8668970757003113418]
[INFO] [05/04/2021 15:41:44.081] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Starting up, Akka version [2.6.10] ...
[INFO] [05/04/2021 15:41:44.198] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [05/04/2021 15:41:44.198] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Started up successfully
[INFO] [05/04/2021 15:41:44.246] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem/system/cluster/core/daemon/downingProvider] SBR started. Config: strategy [KeepMajority], stable-after [20 seconds], down-all-when-unstable [15 seconds], selfUniqueAddress [akka://ClusterSystem@127.0.0.1:2551#-8668970757003113418], selfDc [default].
[INFO] [05/04/2021 15:41:44.496] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Idle entities will be passivated after [2.000 min]
[WARN] [05/04/2021 15:41:44.541] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka.stream.Log(akka://ClusterSystem/system/Materializers/StreamSupervisor-1)] [outbound connection to [akka://ClusterSystem@127.0.0.1:2552], message stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1/<unresolved>:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[WARN] [05/04/2021 15:41:44.541] [ClusterSystem-akka.remote.default-remote-dispatcher-9] [akka.stream.Log(akka://ClusterSystem/system/Materializers/StreamSupervisor-1)] [outbound connection to [akka://ClusterSystem@127.0.0.1:2552], control stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1/<unresolved>:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[INFO] [akkaMemberChanged][05/04/2021 15:41:49.349] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Node [akka://ClusterSystem@127.0.0.1:2551] is JOINING itself (with roles [dc-default], version [0.0.0]) and forming new cluster
[INFO] [05/04/2021 15:41:49.351] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - is the new leader among reachable nodes (more leaders may exist)
[INFO] [akkaMemberChanged][05/04/2021 15:41:49.357] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Up]
[INFO] [05/04/2021 15:41:49.367] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/cluster/core/daemon/downingProvider] This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
[INFO] [akkaClusterSingletonStarted][05/04/2021 15:41:49.372] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton manager starting singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton]
[INFO] [05/04/2021 15:41:49.372] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [Start -> Oldest]
[INFO] [05/04/2021 15:41:49.384] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] MyActor: ShardCoordinator was moved to the active state State(Map())
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
[ERROR] [05/04/2021 15:41:49.655] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] null
java.lang.NullPointerException
    at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.clusterState(AbstractLeastShardAllocationStrategy.scala:78)
    at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.regionEntriesFor(AbstractLeastShardAllocationStrategy.scala:124)
    at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.allocateShard(AbstractLeastShardAllocationStrategy.scala:85)
    at MyActorSharding$MyShardAllocationStrategy.allocateShard(MyActorSharding.scala:31)
    at akka.cluster.sharding.ShardCoordinator$$anonfun$active.applyOrElse(ShardCoordinator.scala:748)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:244)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.cluster.sharding.DDataShardCoordinator.akka$actor$Timers$$super$aroundReceive(ShardCoordinator.scala:1335)
    at akka.actor.Timers.aroundReceive(Timers.scala:56)
    at akka.actor.Timers.aroundReceive$(Timers.scala:41)
    at akka.cluster.sharding.DDataShardCoordinator.aroundReceive(ShardCoordinator.scala:1335)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
    at akka.actor.ActorCell.invoke(ActorCell.scala:547)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

[INFO] [akkaDeadLetter][05/04/2021 15:41:49.669] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] Message [akka.cluster.sharding.ShardCoordinator$Internal$GetShardHome] from Actor[akka://ClusterSystem/system/sharding/MyActor#-1153927922] to Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829] was unhandled. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][05/04/2021 15:41:49.669] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] Message [akka.cluster.sharding.ShardCoordinator$Internal$GetShardHome] from Actor[akka://ClusterSystem/system/sharding/MyActor#-1153927922] to Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829] was unhandled. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][05/04/2021 15:41:49.670] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] Message [akka.cluster.sharding.ShardCoordinator$Internal$GetShardHome] from Actor[akka://ClusterSystem/system/sharding/MyActor#-1153927922] to Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829] was unhandled. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [05/04/2021 15:41:50.171] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] MyActor: ShardCoordinator was moved to the active state State(Map())
[WARN] [05/04/2021 15:41:54.591] [ClusterSystem-akka.actor.internal-dispatcher-2] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Requested shard homes [12 -> shardId -> 7,13 -> shardId -> 33,14 -> shardId -> 13,print -> shardId -> 29] from coordinator at [Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829]]. [4] total buffered messages.
Rebalance Shard ->>>>>>>>>>>>>>>>...
[ERROR] [05/04/2021 15:41:59.672] [ClusterSystem-akka.actor.default-dispatcher-22] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] null
java.lang.NullPointerException
    at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.clusterState(AbstractLeastShardAllocationStrategy.scala:78)
    at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.regionEntriesFor(AbstractLeastShardAllocationStrategy.scala:124)
    at akka.cluster.sharding.ShardCoordinator$LeastShardAllocationStrategy.rebalance(ShardCoordinator.scala:294)
    at MyActorSharding$MyShardAllocationStrategy.rebalance(MyActorSharding.scala:37)
    at akka.cluster.sharding.ShardCoordinator$$anonfun$active.applyOrElse(ShardCoordinator.scala:790)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:244)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.cluster.sharding.DDataShardCoordinator.akka$actor$Timers$$super$aroundReceive(ShardCoordinator.scala:1335)
    at akka.actor.Timers.aroundReceive(Timers.scala:56)
    at akka.actor.Timers.aroundReceive$(Timers.scala:41)
    at akka.cluster.sharding.DDataShardCoordinator.aroundReceive(ShardCoordinator.scala:1335)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
    at akka.actor.ActorCell.invoke(ActorCell.scala:547)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    ...

[INFO] [05/04/2021 15:42:00.191] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] MyActor: ShardCoordinator was moved to the active state State(Map())
[WARN] [05/04/2021 15:42:04.352] [ClusterSystem-akka.remote.default-remote-dispatcher-9] [Association(akka://ClusterSystem)] Outbound control stream to [akka://ClusterSystem@127.0.0.1:2552] failed. Restarting it. akka.remote.artery.OutboundHandshake$HandshakeTimeoutException: Handshake with [akka://ClusterSystem@127.0.0.1:2552] did not complete within 20000 ms
[WARN] [05/04/2021 15:42:04.671] [ClusterSystem-akka.actor.internal-dispatcher-4] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Requested shard homes [12 -> shardId -> 7,13 -> shardId -> 33,14 -> shardId -> 13,print -> shardId -> 29] from coordinator at [Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829]]. [4] total buffered messages.
Rebalance Shard ->>>>>>>>>>>>>>>>...
[ERROR] [05/04/2021 15:42:09.692] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] null
java.lang.NullPointerException
    at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.clusterState(AbstractLeastShardAllocationStrategy.scala:78)
    at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.regionEntriesFor(AbstractLeastShardAllocationStrategy.scala:124)
    at akka.cluster.sharding.ShardCoordinator$LeastShardAllocationStrategy.rebalance(ShardCoordinator.scala:294)
    at MyActorSharding$MyShardAllocationStrategy.rebalance(MyActorSharding.scala:37)
    at akka.cluster.sharding.ShardCoordinator$$anonfun$active.applyOrElse(ShardCoordinator.scala:790)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:244)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.cluster.sharding.DDataShardCoordinator.akka$actor$Timers$$super$aroundReceive(ShardCoordinator.scala:1335)
    at akka.actor.Timers.aroundReceive(Timers.scala:56)
    at akka.actor.Timers.aroundReceive$(Timers.scala:41)
    at akka.cluster.sharding.DDataShardCoordinator.aroundReceive(ShardCoordinator.scala:1335)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
    at akka.actor.ActorCell.invoke(ActorCell.scala:547)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    ...

[INFO] [05/04/2021 15:42:10.211] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] MyActor: ShardCoordinator was moved to the active state State(Map())
[WARN] [05/04/2021 15:42:14.771] [ClusterSystem-akka.actor.internal-dispatcher-2] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Requested shard homes [12 -> shardId -> 7,13 -> shardId -> 33,14 -> shardId -> 13,print -> shardId -> 29] from coordinator at [Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829]]. [4] total buffered messages.
[WARN] [05/04/2021 15:42:15.521] [ClusterSystem-akka.actor.internal-dispatcher-2] [CoordinatedShutdown(akka://ClusterSystem)] Coordinated shutdown phase [cluster-sharding-shutdown-region] timed out after 10000 milliseconds
[INFO] [akkaMemberChanged][05/04/2021 15:42:15.524] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Marked address [akka://ClusterSystem@127.0.0.1:2551] as [Leaving]
[INFO] [05/04/2021 15:42:15.527] [ClusterSystem-akka.actor.internal-dispatcher-4] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Exited [akka://ClusterSystem@127.0.0.1:2551]
[INFO] [05/04/2021 15:42:15.528] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Oldest observed OldestChanged: [akka://ClusterSystem@127.0.0.1:2551 -> None]
[INFO] [05/04/2021 15:42:15.529] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [Oldest -> WasOldest]
[INFO] [akkaMemberChanged][05/04/2021 15:42:15.771] [ClusterSystem-akka.actor.internal-dispatcher-4] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Exiting]
[INFO] [05/04/2021 15:42:16.541] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton manager stopping singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton]
[INFO] [05/04/2021 15:42:16.541] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [WasOldest -> Stopping]
[INFO] [akkaClusterSingletonTerminated][05/04/2021 15:42:16.544] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton] was terminated
[WARN] [05/04/2021 15:42:16.544] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Trying to register to coordinator at [ActorSelection[Anchor(akka://ClusterSystem/), Path(/system/sharding/MyActorCoordinator/singleton/coordinator)]], but no acknowledgement. Total [4] buffered messages. [Coordinator [Member(akka://ClusterSystem@127.0.0.1:2551, Exiting)] is reachable.]
[INFO] [05/04/2021 15:42:16.548] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Exiting completed
[INFO] [05/04/2021 15:42:16.549] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Shutting down...
[INFO] [05/04/2021 15:42:16.550] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Successfully shut down
[INFO] [05/04/2021 15:42:16.557] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Shutting down remote daemon.
[INFO] [05/04/2021 15:42:16.558] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.
[INFO] [05/04/2021 15:42:16.573] [ClusterSystem-akka.remote.default-remote-dispatcher-9] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Remoting shut down.

谁能告诉我为什么会这样?

我知道 ShardAllocationStrategy 在 2.6.10 中从 2.6.9 更改了。

发行说明:- https://akka.io/blog/news/2020/10/09/akka-2.6.10-released 文档:- https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#shard-allocation

您可以使用 StartableAllocationStrategy 来自定义 MyShardAllocationStrategy。此外,您需要将 shardAllocationStrategy 变量的类型更改为 LeastShardAllocationStrategy.

完整代码供参考:

import akka.actor.{ActorLogging, ActorRef, ActorSystem, PoisonPill, Props}
import akka.cluster.Cluster
import akka.cluster.sharding.ShardCoordinator.{LeastShardAllocationStrategy, ShardAllocationStrategy, StartableAllocationStrategy}
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import scala.concurrent.Future
import scala.util.Random
object MyActorSharding extends App {
  class Actor extends akka.actor.Actor with ActorLogging {
    private var index = 0
    override def receive: Receive = {
      case x: Int =>
        index += x
      case "print" =>
        println("???????????????" + index)
        println(self.path.toString)
    }
  }
  class MyShardAllocationStrategy(shardAllocationStrategy: LeastShardAllocationStrategy)
    extends StartableAllocationStrategy {
    override def allocateShard(requester: ActorRef,
                               shardId: ShardId,
                               currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]]): Future[ActorRef] = {
      println("Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..")
      shardAllocationStrategy.allocateShard(requester, shardId, currentShardAllocations)
    }
    override def rebalance(currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]],
                                rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
      println("Rebalance Shard ->>>>>>>>>>>>>>>>...")
      shardAllocationStrategy.rebalance(currentShardAllocations, rebalanceInProgress)
    }
    override def start(): Unit = {
      shardAllocationStrategy.start(system)
    }
  }
  val extractEntityId: PartialFunction[Any, (String, Any)] = {
    case message =>
      (
        s"${message.toString} -> entityId -> ${new Random().nextInt(34)}",
        message
      )
  }
  val shardId: Any => String = x =>
    s"${x.toString} -> shardId -> ${new Random().nextInt(34)}"
  val actorProps: Props = Props[Actor]
  val system = ActorSystem("ClusterSystem")
  val cluster = Cluster(system)
  val clusterSharding = ClusterSharding(system)
  val actorRef = clusterSharding.start(
    "MyActor",
    actorProps,
    ClusterShardingSettings(system),
    extractEntityId,
    shardId,
    new MyShardAllocationStrategy(new LeastShardAllocationStrategy(1, 1)),
    PoisonPill
  )
  actorRef ! 12
  actorRef ! 13
  actorRef ! 14
  actorRef ! "print"
}