F# Akkling 无法通过分片代理发送消息
F# Akkling Unable to send message through sharding proxy
当我尝试使用以下代码向 akka.net 区域代理发送消息时,
open Akkling.Cluster.Sharding
open Akka.Actor
open Akka.Cluster
open Akka.Cluster.Sharding
open System
open Akkling
let configWithPort (port:int) =
let config = Configuration.parse ("""
akka {
actor {
provider = cluster
}
remote {
dot-netty.tcp {
public-hostname = "localhost"
hostname = "localhost"
port = """ + port.ToString() + """
}
}
cluster {
roles = ["Worker"]
sharding {
journal-plugin-id = "akka.persistence.journal.inmem"
snapshot-plugin-id = "akka.persistence.snapshot-store.inmem"
}
seed-nodes = [ "akka.tcp://cluster-system@localhost:5000" ]
}
}
""")
config
.WithFallback(Akka.Cluster.Tools.Singleton.ClusterSingletonManager.DefaultConfig())
.WithFallback(ClusterSharding.DefaultConfig())
let system1 = ActorSystem.Create("cluster-system", configWithPort 5000)
let system2 = ActorSystem.Create("cluster-system", configWithPort 5001)
/// Domain
type FileCommand = {
ProgramId : string
Duration : TimeSpan
FilePath : string
}
/// Actors
let aggregateRootActor (mailbox:Actor<_>) (msg:FileCommand) =
let nodeAddress = Cluster.Get(mailbox.System).SelfUniqueAddress
logInfof mailbox "Program: [%s] with path [%s] on [%A]" msg.ProgramId msg.FilePath nodeAddress
ignored ()
let extractorFunction (message:FileCommand) =
let entityId = message.ProgramId
let hash = entityId.GetHashCode()
let numberOfShards = 5
let shardId = sprintf "shard_%d" ((abs hash) % numberOfShards)
shardId, entityId, message
let region1 = spawnSharded extractorFunction system1 "fileRouter" (props (actorOf2 aggregateRootActor))
let region2 = spawnSharded extractorFunction system2 "fileRouter" (props (actorOf2 aggregateRootActor))
let shardRegionProxy =
spawnShardedProxy extractorFunction system1 "fileRouterProxy" None
并且向代理发送消息总是失败。
shardRegionProxy <! { ProgramId = "a"; Duration = TimeSpan.FromMinutes 10.; FilePath = "\a_1.mp4" } //this failed
错误信息是
> [INFO][8/26/2020 5:13:15 PM][Thread 0027][akka://cluster-system/system/sharding/fileRouterProxyCoordinator/singleton/coordinator] Message [RegisterProxy] from akka://cluster-system/system/sharding/fileRouterProxyProxy to akka://cluster-system/system/sharding/fileRouterProxyCoordinator/singleton/coordinator was not delivered. [6] dead letters encountered. If this is not an expected behavior then akka://cluster-system/system/sharding/fileRouterProxyCoordinator/singleton/coordinator may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
但是这些发送是成功的。
region1 <! { ProgramId = "d"; Duration = TimeSpan.FromMinutes 8.; FilePath = "\a_2.mp4" }
region2 <! { ProgramId = "a"; Duration = TimeSpan.FromMinutes 10.; FilePath = "\a_1.mp4" }
对不起,
如何正确创建分片协调器?
或者如果不正确,像这样使用 shardingcoordinator 有什么问题?
名字写错了,把代码改成这样,就OK了!
let shardRegionProxy = spawnShardedProxy extractorFunction system1 "fileRouter" (Some "Worker")
当我尝试使用以下代码向 akka.net 区域代理发送消息时,
open Akkling.Cluster.Sharding
open Akka.Actor
open Akka.Cluster
open Akka.Cluster.Sharding
open System
open Akkling
let configWithPort (port:int) =
let config = Configuration.parse ("""
akka {
actor {
provider = cluster
}
remote {
dot-netty.tcp {
public-hostname = "localhost"
hostname = "localhost"
port = """ + port.ToString() + """
}
}
cluster {
roles = ["Worker"]
sharding {
journal-plugin-id = "akka.persistence.journal.inmem"
snapshot-plugin-id = "akka.persistence.snapshot-store.inmem"
}
seed-nodes = [ "akka.tcp://cluster-system@localhost:5000" ]
}
}
""")
config
.WithFallback(Akka.Cluster.Tools.Singleton.ClusterSingletonManager.DefaultConfig())
.WithFallback(ClusterSharding.DefaultConfig())
let system1 = ActorSystem.Create("cluster-system", configWithPort 5000)
let system2 = ActorSystem.Create("cluster-system", configWithPort 5001)
/// Domain
type FileCommand = {
ProgramId : string
Duration : TimeSpan
FilePath : string
}
/// Actors
let aggregateRootActor (mailbox:Actor<_>) (msg:FileCommand) =
let nodeAddress = Cluster.Get(mailbox.System).SelfUniqueAddress
logInfof mailbox "Program: [%s] with path [%s] on [%A]" msg.ProgramId msg.FilePath nodeAddress
ignored ()
let extractorFunction (message:FileCommand) =
let entityId = message.ProgramId
let hash = entityId.GetHashCode()
let numberOfShards = 5
let shardId = sprintf "shard_%d" ((abs hash) % numberOfShards)
shardId, entityId, message
let region1 = spawnSharded extractorFunction system1 "fileRouter" (props (actorOf2 aggregateRootActor))
let region2 = spawnSharded extractorFunction system2 "fileRouter" (props (actorOf2 aggregateRootActor))
let shardRegionProxy =
spawnShardedProxy extractorFunction system1 "fileRouterProxy" None
并且向代理发送消息总是失败。
shardRegionProxy <! { ProgramId = "a"; Duration = TimeSpan.FromMinutes 10.; FilePath = "\a_1.mp4" } //this failed
错误信息是
> [INFO][8/26/2020 5:13:15 PM][Thread 0027][akka://cluster-system/system/sharding/fileRouterProxyCoordinator/singleton/coordinator] Message [RegisterProxy] from akka://cluster-system/system/sharding/fileRouterProxyProxy to akka://cluster-system/system/sharding/fileRouterProxyCoordinator/singleton/coordinator was not delivered. [6] dead letters encountered. If this is not an expected behavior then akka://cluster-system/system/sharding/fileRouterProxyCoordinator/singleton/coordinator may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
但是这些发送是成功的。
region1 <! { ProgramId = "d"; Duration = TimeSpan.FromMinutes 8.; FilePath = "\a_2.mp4" }
region2 <! { ProgramId = "a"; Duration = TimeSpan.FromMinutes 10.; FilePath = "\a_1.mp4" }
对不起, 如何正确创建分片协调器?
或者如果不正确,像这样使用 shardingcoordinator 有什么问题?
名字写错了,把代码改成这样,就OK了!
let shardRegionProxy = spawnShardedProxy extractorFunction system1 "fileRouter" (Some "Worker")