使用 play 框架的 Akka 集群设置

Akka cluster setup with play framework

我目前正在尝试使用自动发现服务来实现集群播放 + akka 实现。但是,我似乎 运行 对 play 中包含的 Guice DI 加载器存在问题。他们的文档摘录指出:

https://www.playframework.com/documentation/2.5.x/ScalaAkka#Integrating-with-Akka

While we recommend you use the built in actor system, as it sets up everything such as the correct classloader, lifecycle hooks, etc, there is nothing stopping you from using your own actor system. It is important however to ensure you do the following:

Register a stop hook to shut the actor system down when Play shuts down Pass in the correct classloader from the Play Environment otherwise Akka won’t be able to find your applications classes

Ensure that either you change the location that Play reads it’s akka configuration from using play.akka.config, or that you don’t read your akka configuration from the default akka config, as this will cause problems such as when the systems try to bind to the same remote ports

我已经完成了他们推荐的上述配置,但是我似乎无法绕开 play 仍然绑定它来自 BuiltInModule 的内部 ActorSystemProvider:

class BuiltinModule extends Module {
def bindings(env: Environment, configuration: Configuration): Seq[Binding[_]] = 

    {
        def dynamicBindings(factories: ((Environment, Configuration) => Seq[Binding[_]])*) = {
          factories.flatMap(_(env, configuration))
        }

        Seq(
          bind[Environment] to env,
          bind[ConfigurationProvider].to(new ConfigurationProvider(configuration)),
          bind[Configuration].toProvider[ConfigurationProvider],
          bind[HttpConfiguration].toProvider[HttpConfiguration.HttpConfigurationProvider],

          // Application lifecycle, bound both to the interface, and its implementation, so that Application can access it
          // to shut it down.
          bind[DefaultApplicationLifecycle].toSelf,
          bind[ApplicationLifecycle].to(bind[DefaultApplicationLifecycle]),

          bind[Application].to[DefaultApplication],
          bind[play.Application].to[play.DefaultApplication],

          bind[Router].toProvider[RoutesProvider],
          bind[play.routing.Router].to[JavaRouterAdapter],
          bind[ActorSystem].toProvider[ActorSystemProvider],
          bind[Materializer].toProvider[MaterializerProvider],
          bind[ExecutionContextExecutor].toProvider[ExecutionContextProvider],
          bind[ExecutionContext].to[ExecutionContextExecutor],
          bind[Executor].to[ExecutionContextExecutor],
          bind[HttpExecutionContext].toSelf,

          bind[CryptoConfig].toProvider[CryptoConfigParser],
          bind[CookieSigner].toProvider[CookieSignerProvider],
          bind[CSRFTokenSigner].toProvider[CSRFTokenSignerProvider],
          bind[AESCrypter].toProvider[AESCrypterProvider],
          bind[play.api.libs.Crypto].toSelf,
          bind[TemporaryFileCreator].to[DefaultTemporaryFileCreator]
        ) ++ dynamicBindings(
            HttpErrorHandler.bindingsFromConfiguration,
            HttpFilters.bindingsFromConfiguration,
            HttpRequestHandler.bindingsFromConfiguration,
            ActionCreator.bindingsFromConfiguration
          )
      }
    }

我曾尝试创建自己的 GuiceApplicationBuilder 以绕过此问题,但现在它只是将重复绑定异常移至来自 BuiltInModule。

这是我正在尝试的:

AkkaConfigModule:

package module.akka

import com.google.inject.{AbstractModule, Inject, Provider, Singleton}
import com.typesafe.config.Config
import module.akka.AkkaConfigModule.AkkaConfigProvider
import net.codingwell.scalaguice.ScalaModule
import play.api.Application

/**
  * Created by dmcquill on 8/15/16.
  */
object AkkaConfigModule {
    @Singleton
    class AkkaConfigProvider @Inject() (application: Application) extends Provider[Config] {
        override def get() = {
            val classLoader = application.classloader
            NodeConfigurator.loadConfig(classLoader)
        }
    }
}

/**
  * Binds the application configuration to the [[Config]] interface.
  *
  * The config is bound as an eager singleton so that errors in the config are detected
  * as early as possible.
  */
class AkkaConfigModule extends AbstractModule with ScalaModule {

    override def configure() {
        bind[Config].toProvider[AkkaConfigProvider].asEagerSingleton()
    }

}

ActorSystemModule:

package module.akka


import actor.cluster.ClusterMonitor
import akka.actor.ActorSystem
import com.google.inject._
import com.typesafe.config.Config
import net.codingwell.scalaguice.ScalaModule
import play.api.inject.ApplicationLifecycle

import scala.collection.JavaConversions._

/**
  * Created by dmcquill on 7/27/16.
  */
object ActorSystemModule {
    @Singleton
    class ActorSystemProvider @Inject() (val lifecycle: ApplicationLifecycle, val config: Config, val injector: Injector) extends Provider[ActorSystem] {
        override def get() = {
            val system = ActorSystem(config.getString(NodeConfigurator.CLUSTER_NAME_PROP), config.getConfig("fitnessApp"))

            // add the GuiceAkkaExtension to the system, and initialize it with the Guice injector
            GuiceAkkaExtension(system).initialize(injector)

            system.log.info("Configured seed nodes: " + config.getStringList("fitnessApp.akka.cluster.seed-nodes").mkString(", "))
            system.actorOf(GuiceAkkaExtension(system).props(ClusterMonitor.name))

            lifecycle.addStopHook { () =>
                system.terminate()
            }

            system
        }
    }
}

/**
  * A module providing an Akka ActorSystem.
  */
class ActorSystemModule extends AbstractModule with ScalaModule {
    import module.akka.ActorSystemModule.ActorSystemProvider

    override def configure() {
        bind[ActorSystem].toProvider[ActorSystemProvider].asEagerSingleton()
    }
}

应用程序加载器:

class CustomApplicationLoader extends GuiceApplicationLoader {

    override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = {
        initialBuilder
            .overrides(overrides(context): _*)
            .bindings(new AkkaConfigModule, new ActorSystemModule)
    }

}

我需要完成的主要事情是配置 ActorSystem,以便我可以以编程方式加载 Akka 集群的种子节点。

以上方法是正确的方法还是有更好的方法来实现?如果这是正确的方法,我对 play/guice?

的 DI 设置根本不了解吗?

更新

对于这个架构,play+akka位于同一个节点上。

lightbend 有一个很好的例子 http://www.lightbend.com/activator/template/play-akka-cluster-sample 您可以下载示例并重复使用它。

最后我尝试做一些比必要的更复杂的事情。我没有执行上述流程,而是简单地以编程方式扩展了初始配置,以便我可以以编程方式检索必要的网络信息。

最终结果主要包括几个 classes:

NodeConfigurator:此 class 包含用于从 application.conf 检索属性的相关实用方法,然后以编程方式创建配置以结合使用使用 kubernetes 发现服务。

object NodeConfigurator {

    /**
      * This method given a class loader will return the configuration object for an ActorSystem
      * in a clustered environment
      *
      * @param classLoader the configured classloader of the application
      * @return Config
      */
    def loadConfig(classLoader: ClassLoader) = {
        val config = ConfigFactory.load(classLoader)

        val clusterName = config.getString(CLUSTER_NAME_PROP)
        val seedPort = config.getString(SEED_PORT_CONF_PROP)

        val host = if (config.getString(HOST_CONF_PROP) equals "eth0-address-or-localhost") {
            getLocalHostAddress.getOrElse(DEFAULT_HOST_ADDRESS)
        } else {
            config.getString(HOST_CONF_PROP)
        }

        ConfigFactory.parseString(formatSeedNodesConfig(clusterName, getSeedNodes(config), seedPort, host))
            .withValue(HOST_CONF_PROP, ConfigValueFactory.fromAnyRef(host))
            .withValue("fitnessApp.akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(host))
            .withFallback(config)
            .resolve()
    }

    /**
      * Get the local ip address which defaults to localhost if not
      * found on the eth0 adapter
      *
      * @return Option[String]
      */
    def getLocalHostAddress:  Option[String] = {
        import java.net.NetworkInterface

        import scala.collection.JavaConversions._

        NetworkInterface.getNetworkInterfaces
            .find(_.getName equals "eth0")
            .flatMap { interface =>
                interface.getInetAddresses.find(_.isSiteLocalAddress).map(_.getHostAddress)
            }
    }

    /**
      * Retrieves a set of seed nodes that are currently running in our cluster
      *
      * @param config akka configuration object
      * @return Array[String]
      */
    def getSeedNodes(config: Config) = {
        if(config.hasPath(SEED_NODES_CONF_PROP)) {
            config.getString(SEED_NODES_CONF_PROP).split(",").map(_.trim)
        } else {
            Array.empty[String]
        }
    }

    /**
      * formats the seed node addresses in the proper format
      *
      * @param clusterName name of akka cluster
      * @param seedNodeAddresses listing of current seed nodes
      * @param seedNodePort configured seed node port
      * @param defaultSeedNodeAddress default seed node address
      * @return
      */
    def formatSeedNodesConfig(clusterName: String, seedNodeAddresses: Array[String], seedNodePort: String, defaultSeedNodeAddress: String) = {
        if(seedNodeAddresses.isEmpty) {
            s"""fitnessApp.akka.cluster.seed-nodes = [ "akka.tcp://$clusterName@$defaultSeedNodeAddress:$seedNodePort" ]"""
        } else {
            seedNodeAddresses.map { address =>
                s"""fitnessApp.akka.cluster.seed-nodes += "akka.tcp://$clusterName@$address:$seedNodePort""""
            }.mkString("\n")
        }
    }

    val CLUSTER_NAME_PROP = "fitnessAkka.cluster-name"
    val HOST_CONF_PROP = "fitnessAkka.host"
    val PORT_CONF_PROP = "fitnessAkka.port"
    val SEED_NODES_CONF_PROP = "fitnessAkka.seed-nodes"
    val SEED_PORT_CONF_PROP = "fitnessAkka.seed-port"

    private val DEFAULT_HOST_ADDRESS = "127.0.0.1"
}

CustomApplicationLoader:简单地使用 play 的可覆盖应用程序加载器从 NodeConfigurator 获取生成的配置,然后用它扩展 initialConfiguration。

class CustomApplicationLoader extends GuiceApplicationLoader {

    override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = {
        val classLoader = context.environment.classLoader
        val configuration = Configuration(NodeConfigurator.loadConfig(classLoader))

        initialBuilder
                .in(context.environment)
                .loadConfig(context.initialConfiguration ++ configuration)
                .overrides(overrides(context): _*)
    }

}

AkkaActorModule:提供依赖可注入的 actor ref 与 API 一起使用以显示集群成员。

class AkkaActorModule extends AbstractModule with AkkaGuiceSupport {
    def configure = {
        bindActor[ClusterMonitor]("cluster-monitor")
    }
}

ClusterMonitor:这是一个简单地监听集群事件并另外接收消息以产生当前集群状态的actor。

class ClusterMonitor @Inject() extends Actor with ActorLogging {
    import actor.cluster.ClusterMonitor.GetClusterState

    val cluster = Cluster(context.system)
    private var nodes = Set.empty[Address]

    override def preStart(): Unit = {
        cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
    }

    override def postStop(): Unit = cluster.unsubscribe(self)

    override def receive = {
        case MemberUp(member) => {
            nodes += member.address
            log.info(s"Cluster member up: ${member.address}")
        }
        case UnreachableMember(member) => log.warning(s"Cluster member unreachable: ${member.address}")
        case MemberRemoved(member, previousStatus) => {
            nodes -= member.address
            log.info(s"Cluster member removed: ${member.address}")
        }
        case MemberExited(member) => log.info(s"Cluster member exited: ${member.address}")
        case GetClusterState => sender() ! nodes
        case _: MemberEvent =>
    }

}

object ClusterMonitor {
    case class GetClusterState()
}

应用程序: 简单的测试控制器输出已加入集群的节点列表

class Application @Inject() (@Named("cluster-monitor") clusterMonitorRef: ActorRef) extends Controller {

    implicit val addressWrites = new Writes[Address] {
        def writes(address: Address) = Json.obj(
            "host" -> address.host,
            "port" -> address.port,
            "protocol" -> address.protocol,
            "system" -> address.system
        )
    }

    implicit val timeout = Timeout(5, TimeUnit.SECONDS)

    def listClusterNodes = Action.async {
        (clusterMonitorRef ? GetClusterState).mapTo[Set[Address]].map { addresses =>
            Ok(Json.toJson(addresses))
        }
    }

}

上述控制器的结果产生类似于以下的输出:

$ http GET 192.168.99.100:30760/cluster/nodes

HTTP/1.1 200 OK
Content-Length: 235
Content-Type: application/json
Date: Thu, 18 Aug 2016 02:50:30 GMT

[
    {
        "host": "172.17.0.3", 
        "port": 2551, 
        "protocol": "akka.tcp", 
        "system": "fitnessApp"
    }, 
    {
        "host": "172.17.0.4", 
        "port": 2551, 
        "protocol": "akka.tcp", 
        "system": "fitnessApp"
    }, 
    {
        "host": "172.17.0.5", 
        "port": 2551, 
        "protocol": "akka.tcp", 
        "system": "fitnessApp"
    }
]