我在 SimpleClusterListener 的 F# 实现中收到错误
I receive errors with my F# implementation of SimpleClusterListener
我在 SimpleClusterListener 的 F# 实现中观察到以下错误:
[ERROR][3/20/2017 11:32:53 AM][Thread
0008][[akka://ClusterSystem/system/endpoin
tManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%400.0.0.0%3A2552-
5/endpointWriter#1522364225]] Dropping message
[Akka.Actor.ActorSelectionMessage ] for non-local recipient
[[akka.tcp://ClusterSystem@localhost:2552/]] arriving at
[akka.tcp://ClusterSystem@localhost:2552] inbound addresses
[akka.tcp://Clust erSystem@0.0.0.0:2552]
我 运行 C# 实现(在下面的附录中引用)没有问题。此外,我使用的端口与 C# 实现所使用的端口相同。
注意:
我是 Akka.Net 的新手,因此,我正在努力解决我尝试移植的示例出错的地方。
我的实现如下:
Main.fs
module Program
open System
open System.Configuration
open Akka.Configuration.Hocon
open Akka.Configuration
open Akka.Actor
open Samples.Cluster.Simple
[<Literal>]
let ExitWithSuccess = 0
let createActor port =
let section = ConfigurationManager.GetSection "akka" :?> AkkaConfigurationSection
let config = ConfigurationFactory.ParseString("akka.remote.dot-netty.tcp.port=" + port)
.WithFallback(section.AkkaConfig)
let system = ActorSystem.Create ("ClusterSystem", config)
let actorRef = Props.Create(typeof<SimpleClusterListener>)
system.ActorOf(actorRef, "clusterListener") |> ignore
let startUp (ports:string list) = ports |> List.iter createActor
[<EntryPoint>]
let main args =
startUp ["2551"; "2552"; "0"]
Console.WriteLine("Press any key to exit")
Console.ReadLine() |> ignore
ExitWithSuccess
SimpleClusterListener.fs
namespace Samples.Cluster.Simple
open Akka.Actor
open Akka.Cluster
open Akka.Event
type SimpleClusterListener() =
inherit UntypedActor()
override this.PreStart() =
let cluster = Cluster.Get(UntypedActor.Context.System)
let (events:System.Type array) = [| typeof<ClusterEvent.IMemberEvent>
typeof<ClusterEvent.UnreachableMember> |]
cluster.Subscribe(base.Self, ClusterEvent.InitialStateAsEvents, events)
override this.OnReceive(message:obj) =
let log = UntypedActor.Context.GetLogger()
match message with
| :? ClusterEvent.MemberUp as e -> log.Info("Member is up: {0}", e.Member)
| :? ClusterEvent.UnreachableMember as e -> log.Info("Member detected as unreachable: {0}", e.Member)
| :? ClusterEvent.MemberRemoved as e -> log.Info("Member is removed: {0}", e.Member)
| _ -> ()
override this.PostStop() =
let cluster = Akka.Cluster.Cluster.Get(UntypedActor.Context.System)
cluster.Unsubscribe base.Self
永远不会调用上面的 OnReceive 方法。但是,PreStart 方法可以。
附录:
如前所述,我移植了下面的 C# 实现。我成功地 运行 这段代码。因此,当我尝试移植它时,我很困惑我哪里出错了。
//-----------------------------------------------------------------------
// <copyright file="Program.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using Akka.Actor;
using Akka.Configuration;
using Akka.Configuration.Hocon;
using System;
using System.Configuration;
namespace Samples.Cluster.Simple
{
class Program
{
static void Main(string[] args)
{
StartUp(args.Length == 0 ? new String[] { "2551", "2552", "0" } : args);
Console.WriteLine("Press any key to exit");
Console.ReadLine();
}
public static void StartUp(string[] ports)
{
var section = (AkkaConfigurationSection)ConfigurationManager.GetSection("akka");
foreach (var port in ports)
{
//Override the configuration of the port
var config =
ConfigurationFactory.ParseString("akka.remote.dot-netty.tcp.port=" + port)
.WithFallback(section.AkkaConfig);
//create an Akka system
var system = ActorSystem.Create("ClusterSystem", config);
//create an actor that handles cluster domain events
system.ActorOf(Props.Create(typeof(SimpleClusterListener)), "clusterListener");
}
}
}
}
//-----------------------------------------------------------------------
// <copyright file="SimpleClusterListener.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using Akka.Actor;
using Akka.Cluster;
using Akka.Event;
namespace Samples.Cluster.Simple
{
public class SimpleClusterListener : UntypedActor
{
protected ILoggingAdapter Log = Context.GetLogger();
protected Akka.Cluster.Cluster Cluster = Akka.Cluster.Cluster.Get(Context.System);
/// <summary>
/// Need to subscribe to cluster changes
/// </summary>
protected override void PreStart() =>
Cluster.Subscribe(Self, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent), typeof(ClusterEvent.UnreachableMember) });
/// <summary>
/// Re-subscribe on restart
/// </summary>
protected override void PostStop() => Cluster.Unsubscribe(Self);
protected override void OnReceive(object message)
{
var up = message as ClusterEvent.MemberUp;
if (up != null)
{
var mem = up;
Log.Info("Member is Up: {0}", mem.Member);
}
else if (message is ClusterEvent.UnreachableMember)
{
var unreachable = (ClusterEvent.UnreachableMember)message;
Log.Info("Member detected as unreachable: {0}", unreachable.Member);
}
else if (message is ClusterEvent.MemberRemoved)
{
var removed = (ClusterEvent.MemberRemoved)message;
Log.Info("Member is Removed: {0}", removed.Member);
}
else if (message is ClusterEvent.IMemberEvent)
{
//IGNORE
}
else if (message is ClusterEvent.CurrentClusterState)
{
}
else
{
Unhandled(message);
}
}
}
}
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<configSections>
<section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka"/>
</configSections>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2"/>
</startup>
<akka>
<hocon>
<![CDATA[
akka {
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
}
remote {
log-remote-lifecycle-events = DEBUG
dot-netty.tcp {
hostname = "localhost"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@localhost:2551",
"akka.tcp://ClusterSystem@localhost:2552"]
#auto-down-unreachable-after = 30s
}
}
]]>
</hocon>
</akka>
</configuration>
假设您使用的是 1.1.3 包。
你应该使用
ConfigurationFactory.ParseString("akka.remote.helios.tcp.port=" + port)
.WithFallback(section.AkkaConfig);`
而不是 dot-netty
传输。那个我们还没有发布。并且仅在开发分支中可用。
我在 SimpleClusterListener 的 F# 实现中观察到以下错误:
[ERROR][3/20/2017 11:32:53 AM][Thread 0008][[akka://ClusterSystem/system/endpoin tManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%400.0.0.0%3A2552- 5/endpointWriter#1522364225]] Dropping message [Akka.Actor.ActorSelectionMessage ] for non-local recipient [[akka.tcp://ClusterSystem@localhost:2552/]] arriving at [akka.tcp://ClusterSystem@localhost:2552] inbound addresses [akka.tcp://Clust erSystem@0.0.0.0:2552]
我 运行 C# 实现(在下面的附录中引用)没有问题。此外,我使用的端口与 C# 实现所使用的端口相同。
注意:
我是 Akka.Net 的新手,因此,我正在努力解决我尝试移植的示例出错的地方。
我的实现如下:
Main.fs
module Program
open System
open System.Configuration
open Akka.Configuration.Hocon
open Akka.Configuration
open Akka.Actor
open Samples.Cluster.Simple
[<Literal>]
let ExitWithSuccess = 0
let createActor port =
let section = ConfigurationManager.GetSection "akka" :?> AkkaConfigurationSection
let config = ConfigurationFactory.ParseString("akka.remote.dot-netty.tcp.port=" + port)
.WithFallback(section.AkkaConfig)
let system = ActorSystem.Create ("ClusterSystem", config)
let actorRef = Props.Create(typeof<SimpleClusterListener>)
system.ActorOf(actorRef, "clusterListener") |> ignore
let startUp (ports:string list) = ports |> List.iter createActor
[<EntryPoint>]
let main args =
startUp ["2551"; "2552"; "0"]
Console.WriteLine("Press any key to exit")
Console.ReadLine() |> ignore
ExitWithSuccess
SimpleClusterListener.fs
namespace Samples.Cluster.Simple
open Akka.Actor
open Akka.Cluster
open Akka.Event
type SimpleClusterListener() =
inherit UntypedActor()
override this.PreStart() =
let cluster = Cluster.Get(UntypedActor.Context.System)
let (events:System.Type array) = [| typeof<ClusterEvent.IMemberEvent>
typeof<ClusterEvent.UnreachableMember> |]
cluster.Subscribe(base.Self, ClusterEvent.InitialStateAsEvents, events)
override this.OnReceive(message:obj) =
let log = UntypedActor.Context.GetLogger()
match message with
| :? ClusterEvent.MemberUp as e -> log.Info("Member is up: {0}", e.Member)
| :? ClusterEvent.UnreachableMember as e -> log.Info("Member detected as unreachable: {0}", e.Member)
| :? ClusterEvent.MemberRemoved as e -> log.Info("Member is removed: {0}", e.Member)
| _ -> ()
override this.PostStop() =
let cluster = Akka.Cluster.Cluster.Get(UntypedActor.Context.System)
cluster.Unsubscribe base.Self
永远不会调用上面的 OnReceive 方法。但是,PreStart 方法可以。
附录:
如前所述,我移植了下面的 C# 实现。我成功地 运行 这段代码。因此,当我尝试移植它时,我很困惑我哪里出错了。
//-----------------------------------------------------------------------
// <copyright file="Program.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using Akka.Actor;
using Akka.Configuration;
using Akka.Configuration.Hocon;
using System;
using System.Configuration;
namespace Samples.Cluster.Simple
{
class Program
{
static void Main(string[] args)
{
StartUp(args.Length == 0 ? new String[] { "2551", "2552", "0" } : args);
Console.WriteLine("Press any key to exit");
Console.ReadLine();
}
public static void StartUp(string[] ports)
{
var section = (AkkaConfigurationSection)ConfigurationManager.GetSection("akka");
foreach (var port in ports)
{
//Override the configuration of the port
var config =
ConfigurationFactory.ParseString("akka.remote.dot-netty.tcp.port=" + port)
.WithFallback(section.AkkaConfig);
//create an Akka system
var system = ActorSystem.Create("ClusterSystem", config);
//create an actor that handles cluster domain events
system.ActorOf(Props.Create(typeof(SimpleClusterListener)), "clusterListener");
}
}
}
}
//-----------------------------------------------------------------------
// <copyright file="SimpleClusterListener.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using Akka.Actor;
using Akka.Cluster;
using Akka.Event;
namespace Samples.Cluster.Simple
{
public class SimpleClusterListener : UntypedActor
{
protected ILoggingAdapter Log = Context.GetLogger();
protected Akka.Cluster.Cluster Cluster = Akka.Cluster.Cluster.Get(Context.System);
/// <summary>
/// Need to subscribe to cluster changes
/// </summary>
protected override void PreStart() =>
Cluster.Subscribe(Self, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent), typeof(ClusterEvent.UnreachableMember) });
/// <summary>
/// Re-subscribe on restart
/// </summary>
protected override void PostStop() => Cluster.Unsubscribe(Self);
protected override void OnReceive(object message)
{
var up = message as ClusterEvent.MemberUp;
if (up != null)
{
var mem = up;
Log.Info("Member is Up: {0}", mem.Member);
}
else if (message is ClusterEvent.UnreachableMember)
{
var unreachable = (ClusterEvent.UnreachableMember)message;
Log.Info("Member detected as unreachable: {0}", unreachable.Member);
}
else if (message is ClusterEvent.MemberRemoved)
{
var removed = (ClusterEvent.MemberRemoved)message;
Log.Info("Member is Removed: {0}", removed.Member);
}
else if (message is ClusterEvent.IMemberEvent)
{
//IGNORE
}
else if (message is ClusterEvent.CurrentClusterState)
{
}
else
{
Unhandled(message);
}
}
}
}
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<configSections>
<section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka"/>
</configSections>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2"/>
</startup>
<akka>
<hocon>
<![CDATA[
akka {
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
}
remote {
log-remote-lifecycle-events = DEBUG
dot-netty.tcp {
hostname = "localhost"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@localhost:2551",
"akka.tcp://ClusterSystem@localhost:2552"]
#auto-down-unreachable-after = 30s
}
}
]]>
</hocon>
</akka>
</configuration>
假设您使用的是 1.1.3 包。 你应该使用
ConfigurationFactory.ParseString("akka.remote.helios.tcp.port=" + port)
.WithFallback(section.AkkaConfig);`
而不是 dot-netty
传输。那个我们还没有发布。并且仅在开发分支中可用。