如何使用 Akka.FSharp API 在 Akka.NET 集群中实施故障转移?
How do I implement Failover within an Akka.NET cluster using the Akka.FSharp API?
如何使用 Akka.FSharp API 在 Akka.NET 集群中实施故障转移?
我有以下用作种子的集群节点:
open Akka
open Akka.FSharp
open Akka.Cluster
open System
open System.Configuration
let systemName = "script-cluster"
let nodeName = sprintf "cluster-node-%s" Environment.MachineName
let akkaConfig = Configuration.parse("""akka {
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
}
remote {
log-remote-lifecycle-events = off
helios.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
roles = ["seed"] # custom node roles
seed-nodes = ["akka.tcp://script-cluster@127.0.0.1:2551"]
# when node cannot be reached within 10 sec, mark is as down
auto-down-unreachable-after = 10s
}
}""")
let actorSystem = akkaConfig |> System.create systemName
let clusterHostActor =
spawn actorSystem nodeName (fun (inbox: Actor<ClusterEvent.IClusterDomainEvent>) ->
let cluster = Cluster.Get actorSystem
cluster.Subscribe(inbox.Self, [| typeof<ClusterEvent.IClusterDomainEvent> |])
inbox.Defer(fun () -> cluster.Unsubscribe(inbox.Self))
let rec messageLoop () =
actor {
let! message = inbox.Receive()
// TODO: Handle messages
match message with
| :? ClusterEvent.MemberJoined as event -> printfn "Member %s Joined the Cluster at %O" event.Member.Address.Host DateTime.Now
| :? ClusterEvent.MemberLeft as event -> printfn "Member %s Left the Cluster at %O" event.Member.Address.Host DateTime.Now
| other -> printfn "Cluster Received event %O at %O" other DateTime.Now
return! messageLoop()
}
messageLoop())
然后我有一个可能会死的任意节点:
open Akka
open Akka.FSharp
open Akka.Cluster
open System
open System.Configuration
let systemName = "script-cluster"
let nodeName = sprintf "cluster-node-%s" Environment.MachineName
let akkaConfig = Configuration.parse("""akka {
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
}
remote {
log-remote-lifecycle-events = off
helios.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
roles = ["role-a"] # custom node roles
seed-nodes = ["akka.tcp://script-cluster@127.0.0.1:2551"]
# when node cannot be reached within 10 sec, mark is as down
auto-down-unreachable-after = 10s
}
}""")
let actorSystem = akkaConfig |> System.create systemName
let listenerRef =
spawn actorSystem "temp2"
<| fun mailbox ->
let cluster = Cluster.Get (mailbox.Context.System)
cluster.Subscribe (mailbox.Self, [| typeof<ClusterEvent.IMemberEvent>|])
mailbox.Defer <| fun () -> cluster.Unsubscribe (mailbox.Self)
printfn "Created an actor on node [%A] with roles [%s]" cluster.SelfAddress (String.Join(",", cluster.SelfRoles))
let rec seed () =
actor {
let! (msg: obj) = mailbox.Receive ()
match msg with
| :? ClusterEvent.MemberRemoved as actor -> printfn "Actor removed %A" msg
| :? ClusterEvent.IMemberEvent -> printfn "Cluster event %A" msg
| _ -> printfn "Received: %A" msg
return! seed () }
seed ()
在集群内实施故障转移的推荐做法是什么?
具体来说,是否有代码示例说明集群在其中一个节点不再可用时应如何运行?
- 我的集群节点应该启动替代品还是有不同的行为?
- 是否有自动处理此问题的配置,我无需编写代码即可设置?
- 我必须在何处实施什么代码?
首先,依赖 MemberUp and MemberRemoved 事件(都实现 ClusterEvent.IMemberEvent 接口,所以订阅它)是一个更好的主意,因为它们标记阶段,当节点 joining/leaving 过程已经完成。加入和离开事件不一定确保节点在发出信号的时间点完全可操作。
关于故障转移场景:
- 替换的自动旋转可以通过 Akka.Cluster.Sharding 插件完成(阅读文章 1 and 2 to get more info about how does it work). There's no equivalent in Akka.FSharp for it, but you may use Akkling.Cluster.Sharding plugin instead: see example code。
- 另一种方法是在每个节点上预先创建替换角色。您可以使用 clustered routers or distributed publish/subscribe 将消息路由给他们。然而,这更像是一种情况,当你有无状态场景时,每个演员都可以随时接手另一个演员的工作。这是在生活在许多不同节点上的许多参与者之间分配工作的更通用的解决方案。
- 您还可以将观察者设置为处理 actor。通过使用 monitor 函数,您可以命令您的 actor 监视另一个 actor(无论它住在哪里)。在节点发生故障的情况下,有关死亡 actor 的信息将以
Terminated
消息的形式发送给其所有观察者。这样您就可以实现自己的逻辑,即在另一个节点上重新创建 actor。这实际上是最通用的方式,因为它不使用任何额外的插件或配置,但行为需要自己描述。
如何使用 Akka.FSharp API 在 Akka.NET 集群中实施故障转移?
我有以下用作种子的集群节点:
open Akka
open Akka.FSharp
open Akka.Cluster
open System
open System.Configuration
let systemName = "script-cluster"
let nodeName = sprintf "cluster-node-%s" Environment.MachineName
let akkaConfig = Configuration.parse("""akka {
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
}
remote {
log-remote-lifecycle-events = off
helios.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
roles = ["seed"] # custom node roles
seed-nodes = ["akka.tcp://script-cluster@127.0.0.1:2551"]
# when node cannot be reached within 10 sec, mark is as down
auto-down-unreachable-after = 10s
}
}""")
let actorSystem = akkaConfig |> System.create systemName
let clusterHostActor =
spawn actorSystem nodeName (fun (inbox: Actor<ClusterEvent.IClusterDomainEvent>) ->
let cluster = Cluster.Get actorSystem
cluster.Subscribe(inbox.Self, [| typeof<ClusterEvent.IClusterDomainEvent> |])
inbox.Defer(fun () -> cluster.Unsubscribe(inbox.Self))
let rec messageLoop () =
actor {
let! message = inbox.Receive()
// TODO: Handle messages
match message with
| :? ClusterEvent.MemberJoined as event -> printfn "Member %s Joined the Cluster at %O" event.Member.Address.Host DateTime.Now
| :? ClusterEvent.MemberLeft as event -> printfn "Member %s Left the Cluster at %O" event.Member.Address.Host DateTime.Now
| other -> printfn "Cluster Received event %O at %O" other DateTime.Now
return! messageLoop()
}
messageLoop())
然后我有一个可能会死的任意节点:
open Akka
open Akka.FSharp
open Akka.Cluster
open System
open System.Configuration
let systemName = "script-cluster"
let nodeName = sprintf "cluster-node-%s" Environment.MachineName
let akkaConfig = Configuration.parse("""akka {
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
}
remote {
log-remote-lifecycle-events = off
helios.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
roles = ["role-a"] # custom node roles
seed-nodes = ["akka.tcp://script-cluster@127.0.0.1:2551"]
# when node cannot be reached within 10 sec, mark is as down
auto-down-unreachable-after = 10s
}
}""")
let actorSystem = akkaConfig |> System.create systemName
let listenerRef =
spawn actorSystem "temp2"
<| fun mailbox ->
let cluster = Cluster.Get (mailbox.Context.System)
cluster.Subscribe (mailbox.Self, [| typeof<ClusterEvent.IMemberEvent>|])
mailbox.Defer <| fun () -> cluster.Unsubscribe (mailbox.Self)
printfn "Created an actor on node [%A] with roles [%s]" cluster.SelfAddress (String.Join(",", cluster.SelfRoles))
let rec seed () =
actor {
let! (msg: obj) = mailbox.Receive ()
match msg with
| :? ClusterEvent.MemberRemoved as actor -> printfn "Actor removed %A" msg
| :? ClusterEvent.IMemberEvent -> printfn "Cluster event %A" msg
| _ -> printfn "Received: %A" msg
return! seed () }
seed ()
在集群内实施故障转移的推荐做法是什么?
具体来说,是否有代码示例说明集群在其中一个节点不再可用时应如何运行?
- 我的集群节点应该启动替代品还是有不同的行为?
- 是否有自动处理此问题的配置,我无需编写代码即可设置?
- 我必须在何处实施什么代码?
首先,依赖 MemberUp and MemberRemoved 事件(都实现 ClusterEvent.IMemberEvent 接口,所以订阅它)是一个更好的主意,因为它们标记阶段,当节点 joining/leaving 过程已经完成。加入和离开事件不一定确保节点在发出信号的时间点完全可操作。
关于故障转移场景:
- 替换的自动旋转可以通过 Akka.Cluster.Sharding 插件完成(阅读文章 1 and 2 to get more info about how does it work). There's no equivalent in Akka.FSharp for it, but you may use Akkling.Cluster.Sharding plugin instead: see example code。
- 另一种方法是在每个节点上预先创建替换角色。您可以使用 clustered routers or distributed publish/subscribe 将消息路由给他们。然而,这更像是一种情况,当你有无状态场景时,每个演员都可以随时接手另一个演员的工作。这是在生活在许多不同节点上的许多参与者之间分配工作的更通用的解决方案。
- 您还可以将观察者设置为处理 actor。通过使用 monitor 函数,您可以命令您的 actor 监视另一个 actor(无论它住在哪里)。在节点发生故障的情况下,有关死亡 actor 的信息将以
Terminated
消息的形式发送给其所有观察者。这样您就可以实现自己的逻辑,即在另一个节点上重新创建 actor。这实际上是最通用的方式,因为它不使用任何额外的插件或配置,但行为需要自己描述。