我在 SimpleClusterListener 的 F# 实现中收到错误

I receive errors with my F# implementation of SimpleClusterListener

我在 SimpleClusterListener 的 F# 实现中观察到以下错误:

[ERROR][3/20/2017 11:32:53 AM][Thread 0008][[akka://ClusterSystem/system/endpoin tManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%400.0.0.0%3A2552- 5/endpointWriter#1522364225]] Dropping message [Akka.Actor.ActorSelectionMessage ] for non-local recipient [[akka.tcp://ClusterSystem@localhost:2552/]] arriving at [akka.tcp://ClusterSystem@localhost:2552] inbound addresses [akka.tcp://Clust erSystem@0.0.0.0:2552]

我 运行 C# 实现(在下面的附录中引用)没有问题。此外,我使用的端口与 C# 实现所使用的端口相同。

注意:

我是 Akka.Net 的新手,因此,我正在努力解决我尝试移植的示例出错的地方。

我的实现如下:

Main.fs

module Program

open System
open System.Configuration
open Akka.Configuration.Hocon
open Akka.Configuration
open Akka.Actor
open Samples.Cluster.Simple

[<Literal>]
let ExitWithSuccess = 0

let createActor port =
    let section = ConfigurationManager.GetSection "akka" :?> AkkaConfigurationSection
    let config =  ConfigurationFactory.ParseString("akka.remote.dot-netty.tcp.port=" + port)
                    .WithFallback(section.AkkaConfig) 
    let system = ActorSystem.Create ("ClusterSystem", config)
    let actorRef = Props.Create(typeof<SimpleClusterListener>)
    system.ActorOf(actorRef, "clusterListener") |> ignore

let startUp (ports:string list) = ports |> List.iter createActor

[<EntryPoint>]
let main args =
    startUp ["2551"; "2552"; "0"]
    Console.WriteLine("Press any key to exit")
    Console.ReadLine() |> ignore
    ExitWithSuccess

SimpleClusterListener.fs

namespace Samples.Cluster.Simple

open Akka.Actor
open Akka.Cluster
open Akka.Event

type SimpleClusterListener() =

    inherit UntypedActor()

        override this.PreStart() =
            let cluster = Cluster.Get(UntypedActor.Context.System)
            let (events:System.Type array) = [| typeof<ClusterEvent.IMemberEvent>
                                                typeof<ClusterEvent.UnreachableMember> |]
            cluster.Subscribe(base.Self, ClusterEvent.InitialStateAsEvents, events)

        override this.OnReceive(message:obj) =

            let log = UntypedActor.Context.GetLogger()

            match message with
            | :? ClusterEvent.MemberUp          as e -> log.Info("Member is up: {0}",                   e.Member)
            | :? ClusterEvent.UnreachableMember as e -> log.Info("Member detected as unreachable: {0}", e.Member)
            | :? ClusterEvent.MemberRemoved     as e -> log.Info("Member is removed: {0}",              e.Member)
            | _ -> ()

        override this.PostStop() = 
            let cluster = Akka.Cluster.Cluster.Get(UntypedActor.Context.System)
            cluster.Unsubscribe base.Self

永远不会调用上面的 OnReceive 方法。但是,PreStart 方法可以。

附录:

如前所述,我移植了下面的 C# 实现。我成功地 运行 这段代码。因此,当我尝试移植它时,我很困惑我哪里出错了。

//-----------------------------------------------------------------------
// <copyright file="Program.cs" company="Akka.NET Project">
//     Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
//     Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Actor;
using Akka.Configuration;
using Akka.Configuration.Hocon;
using System;
using System.Configuration;

namespace Samples.Cluster.Simple
{
    class Program
    {
        static void Main(string[] args)
        {
            StartUp(args.Length == 0 ? new String[] { "2551", "2552", "0" } : args);
            Console.WriteLine("Press any key to exit");
            Console.ReadLine();
        }

        public static void StartUp(string[] ports)
        {
            var section = (AkkaConfigurationSection)ConfigurationManager.GetSection("akka");
            foreach (var port in ports)
            {
                //Override the configuration of the port
                var config =
                    ConfigurationFactory.ParseString("akka.remote.dot-netty.tcp.port=" + port)
                        .WithFallback(section.AkkaConfig);

                //create an Akka system
                var system = ActorSystem.Create("ClusterSystem", config);

                //create an actor that handles cluster domain events
                system.ActorOf(Props.Create(typeof(SimpleClusterListener)), "clusterListener");
            }
        }
    }
}



//-----------------------------------------------------------------------
// <copyright file="SimpleClusterListener.cs" company="Akka.NET Project">
//     Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
//     Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Actor;
using Akka.Cluster;
using Akka.Event;

namespace Samples.Cluster.Simple
{
    public class SimpleClusterListener : UntypedActor
    {
        protected ILoggingAdapter Log = Context.GetLogger();
        protected Akka.Cluster.Cluster Cluster = Akka.Cluster.Cluster.Get(Context.System);

        /// <summary>
        /// Need to subscribe to cluster changes
        /// </summary>
        protected override void PreStart() =>
            Cluster.Subscribe(Self, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent), typeof(ClusterEvent.UnreachableMember) });

        /// <summary>
        /// Re-subscribe on restart
        /// </summary>
        protected override void PostStop() => Cluster.Unsubscribe(Self);

        protected override void OnReceive(object message)
        {
            var up = message as ClusterEvent.MemberUp;
            if (up != null)
            {
                var mem = up;
                Log.Info("Member is Up: {0}", mem.Member);
            }
            else if (message is ClusterEvent.UnreachableMember)
            {
                var unreachable = (ClusterEvent.UnreachableMember)message;
                Log.Info("Member detected as unreachable: {0}", unreachable.Member);
            }
            else if (message is ClusterEvent.MemberRemoved)
            {
                var removed = (ClusterEvent.MemberRemoved)message;
                Log.Info("Member is Removed: {0}", removed.Member);
            }
            else if (message is ClusterEvent.IMemberEvent)
            {
                //IGNORE                
            }
            else if (message is ClusterEvent.CurrentClusterState)
            {

            }
            else
            {
                Unhandled(message);
            }
        }
    }
}


<?xml version="1.0" encoding="utf-8"?>
<configuration>
  <configSections>
    <section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka"/>
  </configSections>
  <startup>
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2"/>
  </startup>
  <akka>
    <hocon>
      <![CDATA[
          akka {
            actor {
              provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
            }

            remote {
              log-remote-lifecycle-events = DEBUG
              dot-netty.tcp {
                hostname = "localhost"
                port = 0
              }
            }

            cluster {
              seed-nodes = [
                "akka.tcp://ClusterSystem@localhost:2551",
                "akka.tcp://ClusterSystem@localhost:2552"]

              #auto-down-unreachable-after = 30s
            }
          }
      ]]>
    </hocon>
  </akka>
</configuration>

假设您使用的是 1.1.3 包。 你应该使用

ConfigurationFactory.ParseString("akka.remote.helios.tcp.port=" + port)
                    .WithFallback(section.AkkaConfig);`

而不是 dot-netty 传输。那个我们还没有发布。并且仅在开发分支中可用。