如何在 F# 的集群配置中创建角色

How to create an actor in a clustered configuration in F#

我正在创建一个 Akka.Cluster 示例,其中包含三个节点 A、B 和 C,其中 A 是灯塔。到目前为止,从日志来看,当没有参与者或参与者是本地的(使用 spawnspawnOpt 创建)时,集群工作正常。我想从 B 创建一个演员并从 C 访问它。

let _ = spawne system "r-actor" <@ actorOf (fun msg -> printfn "Received: %s" msg) @> []

我明白了

2016-08-31 01:59:00.1185|INFO|Akka.Actor.EmptyLocalActorRef|Message String from akka://calculatorSystem/deadLetters to akka://calculatorSystem/user/r-actor was not delivered. 1 dead letters encountered.

使用

let r = FromConfig.Instance :> RouterConfig |> SpawnOption.Router
let _ = spawne system "r-actor" <@ actorOf (fun msg -> printfn "Received: %s" msg) @> [r]

抛出异常

An unhandled exception of type Akka.Configuration.ConfigurationException occurred in Akka.dll
Additional information: Configuration problem while creating [akka://calculatorSystem/user/r-actor] with router dispatcher [akka.actor.default-dispatcher] and mailbox and routee dispatcher [akka.actor.default-dispatcher] and mailbox [].

节点C上的测试函数是

let rec calculate content =
  printfn "Processing %s" content
  let actor = select "/user/r-actor" system
  actor <! content
  let text = Console.ReadLine()
  if text <> "quit" then
    calculate text
calculate "sample1"

HOCON(节点 B)

    akka {
      actor {
        provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
        serializers {
          wire = "Akka.Serialization.WireSerializer, Akka.Serialization.Wire"
        }
        serialization-bindings {
          "System.Object" = wire
        }
        deployment {
          /user/add {
            router = round-robin-pool
            nr-of-instances = 10
            cluster {
              enabled = on
              max-nr-of-instances-per-node = 10
              allow-local-routees = off
            }
          }
          /user/r-actor {
            router = round-robin-pool
            nr-of-instances = 10
            cluster {
              enabled = on
              max-nr-of-instances-per-node = 10
              allow-local-routees = off
            }
          }
        }
      }
      remote {
        log-remote-lifecycle-events = DEBUG
        log-received-messages = on
        helios.tcp {
          transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote"
          applied-adapters = []
          transport-protocol = tcp
          hostname = "127.0.0.1"
          port = 0
        }
      }
      loggers = ["Akka.Logger.NLog.NLogLogger,Akka.Logger.NLog"]
      cluster {
        seed-nodes = [
          "akka.tcp://calculatorSystem@127.0.0.1:7001"
        ]
        roles = ["add-service"]
        auto-down-unreachable-after = 10s
      }
    }

如何创建一个可以被集群中的另一个节点调用的actor?

定义路由器配置时,不要使用 /user 前缀 - 它会自动添加。

另外,如果你想要 select 驻留在另一个节点上的 actor,你需要使用完整的 actor 路径(带有节点地址)——这是因为事实上,可能有不同的 actor 生活在不同的节点上用同样的路径。一定要准确。

通常您在编译时不知道节点的地址。有两种提取方式:

1。从集群状态获取地址

这更容易,但您正在失去对 joining/leaving 节点做出反应的能力。你每次都被迫检查它,这很慢。此示例使用 ActorSelection.ResolveOne 检索实际 IActorRef 实例。

async {
    let members = Cluster.Get(system).State.Members
    let actorRefs =
        members 
        |> Seq.filter (fun m -> m.Roles.Contains("expected")) // use roles to filter out nodes that shouldn't be checked
        |> Seq.map (fun m -> 
            let selection = select (m.Address.ToString() + "user/r-actor") system
            let actorRef = selection.ResolveOne(timeout) |> Async.AwaitTask)
        |> Async.Parallel }

2。订阅集群事件并对 joining/leaving 个节点

做出反应

在这里您可以对节点做出反应,因为它们 join/leave。它还使用 Identify/ActorIdentity 接收实际的 IActorRef,这是更快的选项。

let aref =  
    spawn system "listener"
    <| fun mailbox ->
        let cluster = Cluster.Get (mailbox.Context.System)
        cluster.Subscribe (mailbox.Self, [| typeof<ClusterEvent.IMemberEvent> |])
        mailbox.Defer <| fun () -> cluster.Unsubscribe (mailbox.Self)
        let rec loop () = 
            actor {
                let! (msg: obj) = mailbox.Receive ()
                match msg with
                | :? ClusterEvent.MemberUp as up -> 
                    // new node joined the cluster
                    let selection = select (up.Member.Address.ToString() + "user/r-actor") mailbox
                    selection <! Identify(null) // request actor under selection to identify itself
                | :? ActorIdentity as id when id.Subject <> null ->
                    // actor has identified itself
                    id.Subject <! "hello"
                | :? ClusterEvent.MemberRemoved as rem -> 
                    //  node leaved the cluster, invalidate all actors from that node
                | _ -> ()
                return! loop () }
        loop ()

如有疑问,我写了一篇关于从 F# 创建 Akka.NET 集群的 blog post。也许你会发现它很有用。