Pubsub 在 Akka 集群中使用主题组

Pubsub using Topic Groups with Akka Cluster

我正在尝试使用 Akka Cluster 创建一个 pubsub 风格的应用程序。我正在阅读 the docs 有关 pubsub 的内容,现在正在尝试 运行 他们的例子。

我的基本工作流程是这样的:

运行 订户(成为 master/leader)。 运行 发布者(将向主题发送字符串。所有订阅者都会收到此消息)。

这是我的代码:

Subscriber.scala

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}

class Subscriber extends Actor with ActorLogging {
  import DistributedPubSubMediator.{ Subscribe, SubscribeAck }
  val mediator = DistributedPubSub(context.system).mediator
  // subscribe to the topic named "content"
  mediator ! Subscribe("content", self)

  def receive = {
    case s: String =>
      log.info("Got {}", s)
    case SubscribeAck(Subscribe("content", None, `self`)) =>
      log.info("subscribing");
  }
}

object SubscriberMain extends App {
  val system = ActorSystem("ClusterSystem")
  val actor = system.actorOf(Props[Subscriber], name="Subscriber")
}

Publisher.scala

import akka.actor.{Actor, ActorSystem, Props}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import com.typesafe.config.ConfigFactory

class Publisher extends Actor {
  import DistributedPubSubMediator.Publish
  // activate the extension
  val mediator = DistributedPubSub(context.system).mediator

  def receive = {
    case in: String => {
      val out = in.toUpperCase
      println(s"Received '$in', transformed to '$out'.")
      mediator ! Publish("content", out)
    }
  }
}

object PublisherMain extends App {
  val config = ConfigFactory.load()
  val system = ActorSystem("ClusterSystem", config.getConfig("PublishApp"))
  val actor = system.actorOf(Props[Publisher], name="Publisher")
  actor ! "something small"
}

application.conf

akka {
  loglevel = "INFO"
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    auto-down-unreachable-after = 10s
  }
  log-dead-letters = 0
  log-dead-letters-during-shutdown = off
}

PublishApp {
  akka {
    loglevel = "DEBUG"
    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }
    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      netty.tcp {
        hostname = "127.0.0.1"
        port = 2552
      }
      log-sent-messages = on
      log-received-messages = on
    }
  }
}

我发现的是,当我 运行 发布者的主要内容时,它会在尝试发布到 "content" 主题时挂起,并且订阅者不会收到消息。

发布者的日志包含以下内容:

[INFO] [12/09/2016 15:14:35.513] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/system/distributedPubSubMediator] Message [java.lang.String] from Actor[akka://ClusterSystem/user/Publisher#-1478463431] to Actor[akka://ClusterSystem/system/distributedPubSubMediator#-1539813703] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

我一直在仔细研究文档,但主题概念似乎是事后才提到的。

为什么我的订阅者没有收到这条消息?

其实发布者和订阅者都很好。问题来自于您是从分布式发布订阅示例开始的。作为先决条件,您需要的当然是集群的设置,以便通过它分发消息。

Main.scala:

import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.Cluster
import com.example.Publisher
import com.example.Subscriber

object Main {
  def main(args: Array[String]): Unit = {
    val systemName = "PubSub"
    val system1 = ActorSystem(systemName)
    val joinAddress = Cluster(system1).selfAddress
    Cluster(system1).join(joinAddress)
    val publisher = system1.actorOf(Props[Publisher], "publisher")

    Thread.sleep(5000)
    val system2 = ActorSystem(systemName)
    Cluster(system2).join(joinAddress)
    system2.actorOf(Props[Subscriber], "subscriber")

    Thread.sleep(5000)
    publisher ! "something"
  }
}

如果您现在 运行 Main.scala 您将看到以下内容:

  • 正在创建 system1 并将其加入集群(隐式创建)
  • 正在创建 system2 并加入集群
  • 消息被发送到发布者,发布者将其转发给中介,中介随后将其分发到集群中
  • 订阅者收到消息

编辑: 我将您的 application.conf 简化为如下所示:

akka {
  loglevel = "INFO"
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  log-dead-letters = 0
  log-dead-letters-during-shutdown = off
}

注意 netty.tcp.port = 0 - 这将保证您获得分配的随机端口,并且集群成员不会发生端口冲突。您可以在我的输出中看到端口 49759 和 49772


输出:

Running Main 
[INFO] [12/10/2016 12:44:09.175] [main] [akka.remote.Remoting] Starting remoting
[INFO] [12/10/2016 12:44:09.439] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://PubSub@127.0.0.1:49759]
[INFO] [12/10/2016 12:44:09.451] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Starting up...
[INFO] [12/10/2016 12:44:09.537] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Started up successfully
[INFO] [12/10/2016 12:44:09.537] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [12/10/2016 12:44:09.546] [PubSub-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [12/10/2016 12:44:09.549] [PubSub-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Metrics collection has started successfully
[INFO] [12/10/2016 12:44:09.573] [PubSub-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - No seed-nodes configured, manual cluster join required
[INFO] [12/10/2016 12:44:09.594] [PubSub-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Node [akka.tcp://PubSub@127.0.0.1:49759] is JOINING, roles []
[INFO] [12/10/2016 12:44:09.606] [PubSub-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Leader is moving node [akka.tcp://PubSub@127.0.0.1:49759] to [Up]
[INFO] [12/10/2016 12:44:14.598] [main] [akka.remote.Remoting] Starting remoting
[INFO] [12/10/2016 12:44:14.616] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://PubSub@127.0.0.1:49772]
[INFO] [12/10/2016 12:44:14.617] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Starting up...
[INFO] [12/10/2016 12:44:14.626] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Started up successfully
[INFO] [12/10/2016 12:44:14.627] [PubSub-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [12/10/2016 12:44:14.627] [PubSub-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Metrics collection has started successfully
[INFO] [12/10/2016 12:44:14.633] [PubSub-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - No seed-nodes configured, manual cluster join required
[INFO] [12/10/2016 12:44:14.653] [PubSub-akka.actor.default-dispatcher-16] [akka.tcp://PubSub@127.0.0.1:49772/user/subscriber] subscribing
[INFO] [12/10/2016 12:44:14.844] [PubSub-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Node [akka.tcp://PubSub@127.0.0.1:49772] is JOINING, roles []
[INFO] [12/10/2016 12:44:14.920] [PubSub-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Welcome from [akka.tcp://PubSub@127.0.0.1:49759]
[INFO] [12/10/2016 12:44:15.580] [PubSub-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Leader is moving node [akka.tcp://PubSub@127.0.0.1:49772] to [Up]
Received 'something', transformed to 'SOMETHING'.
[INFO] [12/10/2016 12:44:19.673] [PubSub-akka.actor.default-dispatcher-4] [akka.tcp://PubSub@127.0.0.1:49772/user/subscriber] Got SOMETHING