Pubsub 在 Akka 集群中使用主题组
Pubsub using Topic Groups with Akka Cluster
我正在尝试使用 Akka Cluster 创建一个 pubsub 风格的应用程序。我正在阅读 the docs 有关 pubsub 的内容,现在正在尝试 运行 他们的例子。
我的基本工作流程是这样的:
运行 订户(成为 master/leader)。
运行 发布者(将向主题发送字符串。所有订阅者都会收到此消息)。
这是我的代码:
Subscriber.scala
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
class Subscriber extends Actor with ActorLogging {
import DistributedPubSubMediator.{ Subscribe, SubscribeAck }
val mediator = DistributedPubSub(context.system).mediator
// subscribe to the topic named "content"
mediator ! Subscribe("content", self)
def receive = {
case s: String =>
log.info("Got {}", s)
case SubscribeAck(Subscribe("content", None, `self`)) =>
log.info("subscribing");
}
}
object SubscriberMain extends App {
val system = ActorSystem("ClusterSystem")
val actor = system.actorOf(Props[Subscriber], name="Subscriber")
}
Publisher.scala
import akka.actor.{Actor, ActorSystem, Props}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import com.typesafe.config.ConfigFactory
class Publisher extends Actor {
import DistributedPubSubMediator.Publish
// activate the extension
val mediator = DistributedPubSub(context.system).mediator
def receive = {
case in: String => {
val out = in.toUpperCase
println(s"Received '$in', transformed to '$out'.")
mediator ! Publish("content", out)
}
}
}
object PublisherMain extends App {
val config = ConfigFactory.load()
val system = ActorSystem("ClusterSystem", config.getConfig("PublishApp"))
val actor = system.actorOf(Props[Publisher], name="Publisher")
actor ! "something small"
}
application.conf
akka {
loglevel = "INFO"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
auto-down-unreachable-after = 10s
}
log-dead-letters = 0
log-dead-letters-during-shutdown = off
}
PublishApp {
akka {
loglevel = "DEBUG"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
log-sent-messages = on
log-received-messages = on
}
}
}
我发现的是,当我 运行 发布者的主要内容时,它会在尝试发布到 "content" 主题时挂起,并且订阅者不会收到消息。
发布者的日志包含以下内容:
[INFO] [12/09/2016 15:14:35.513] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/system/distributedPubSubMediator] Message [java.lang.String] from Actor[akka://ClusterSystem/user/Publisher#-1478463431] to Actor[akka://ClusterSystem/system/distributedPubSubMediator#-1539813703] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
我一直在仔细研究文档,但主题概念似乎是事后才提到的。
为什么我的订阅者没有收到这条消息?
其实发布者和订阅者都很好。问题来自于您是从分布式发布订阅示例开始的。作为先决条件,您需要的当然是集群的设置,以便通过它分发消息。
Main.scala
:
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.Cluster
import com.example.Publisher
import com.example.Subscriber
object Main {
def main(args: Array[String]): Unit = {
val systemName = "PubSub"
val system1 = ActorSystem(systemName)
val joinAddress = Cluster(system1).selfAddress
Cluster(system1).join(joinAddress)
val publisher = system1.actorOf(Props[Publisher], "publisher")
Thread.sleep(5000)
val system2 = ActorSystem(systemName)
Cluster(system2).join(joinAddress)
system2.actorOf(Props[Subscriber], "subscriber")
Thread.sleep(5000)
publisher ! "something"
}
}
如果您现在 运行 Main.scala
您将看到以下内容:
- 正在创建 system1 并将其加入集群(隐式创建)
- 正在创建 system2 并加入集群
- 消息被发送到发布者,发布者将其转发给中介,中介随后将其分发到集群中
- 订阅者收到消息
编辑: 我将您的 application.conf
简化为如下所示:
akka {
loglevel = "INFO"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
log-dead-letters = 0
log-dead-letters-during-shutdown = off
}
注意 netty.tcp.port = 0
- 这将保证您获得分配的随机端口,并且集群成员不会发生端口冲突。您可以在我的输出中看到端口 49759 和 49772
输出:
Running Main
[INFO] [12/10/2016 12:44:09.175] [main] [akka.remote.Remoting] Starting remoting
[INFO] [12/10/2016 12:44:09.439] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://PubSub@127.0.0.1:49759]
[INFO] [12/10/2016 12:44:09.451] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Starting up...
[INFO] [12/10/2016 12:44:09.537] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Started up successfully
[INFO] [12/10/2016 12:44:09.537] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [12/10/2016 12:44:09.546] [PubSub-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [12/10/2016 12:44:09.549] [PubSub-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Metrics collection has started successfully
[INFO] [12/10/2016 12:44:09.573] [PubSub-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - No seed-nodes configured, manual cluster join required
[INFO] [12/10/2016 12:44:09.594] [PubSub-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Node [akka.tcp://PubSub@127.0.0.1:49759] is JOINING, roles []
[INFO] [12/10/2016 12:44:09.606] [PubSub-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Leader is moving node [akka.tcp://PubSub@127.0.0.1:49759] to [Up]
[INFO] [12/10/2016 12:44:14.598] [main] [akka.remote.Remoting] Starting remoting
[INFO] [12/10/2016 12:44:14.616] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://PubSub@127.0.0.1:49772]
[INFO] [12/10/2016 12:44:14.617] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Starting up...
[INFO] [12/10/2016 12:44:14.626] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Started up successfully
[INFO] [12/10/2016 12:44:14.627] [PubSub-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [12/10/2016 12:44:14.627] [PubSub-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Metrics collection has started successfully
[INFO] [12/10/2016 12:44:14.633] [PubSub-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - No seed-nodes configured, manual cluster join required
[INFO] [12/10/2016 12:44:14.653] [PubSub-akka.actor.default-dispatcher-16] [akka.tcp://PubSub@127.0.0.1:49772/user/subscriber] subscribing
[INFO] [12/10/2016 12:44:14.844] [PubSub-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Node [akka.tcp://PubSub@127.0.0.1:49772] is JOINING, roles []
[INFO] [12/10/2016 12:44:14.920] [PubSub-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Welcome from [akka.tcp://PubSub@127.0.0.1:49759]
[INFO] [12/10/2016 12:44:15.580] [PubSub-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Leader is moving node [akka.tcp://PubSub@127.0.0.1:49772] to [Up]
Received 'something', transformed to 'SOMETHING'.
[INFO] [12/10/2016 12:44:19.673] [PubSub-akka.actor.default-dispatcher-4] [akka.tcp://PubSub@127.0.0.1:49772/user/subscriber] Got SOMETHING
我正在尝试使用 Akka Cluster 创建一个 pubsub 风格的应用程序。我正在阅读 the docs 有关 pubsub 的内容,现在正在尝试 运行 他们的例子。
我的基本工作流程是这样的:
运行 订户(成为 master/leader)。 运行 发布者(将向主题发送字符串。所有订阅者都会收到此消息)。
这是我的代码:
Subscriber.scala
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
class Subscriber extends Actor with ActorLogging {
import DistributedPubSubMediator.{ Subscribe, SubscribeAck }
val mediator = DistributedPubSub(context.system).mediator
// subscribe to the topic named "content"
mediator ! Subscribe("content", self)
def receive = {
case s: String =>
log.info("Got {}", s)
case SubscribeAck(Subscribe("content", None, `self`)) =>
log.info("subscribing");
}
}
object SubscriberMain extends App {
val system = ActorSystem("ClusterSystem")
val actor = system.actorOf(Props[Subscriber], name="Subscriber")
}
Publisher.scala
import akka.actor.{Actor, ActorSystem, Props}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import com.typesafe.config.ConfigFactory
class Publisher extends Actor {
import DistributedPubSubMediator.Publish
// activate the extension
val mediator = DistributedPubSub(context.system).mediator
def receive = {
case in: String => {
val out = in.toUpperCase
println(s"Received '$in', transformed to '$out'.")
mediator ! Publish("content", out)
}
}
}
object PublisherMain extends App {
val config = ConfigFactory.load()
val system = ActorSystem("ClusterSystem", config.getConfig("PublishApp"))
val actor = system.actorOf(Props[Publisher], name="Publisher")
actor ! "something small"
}
application.conf
akka {
loglevel = "INFO"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
auto-down-unreachable-after = 10s
}
log-dead-letters = 0
log-dead-letters-during-shutdown = off
}
PublishApp {
akka {
loglevel = "DEBUG"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
log-sent-messages = on
log-received-messages = on
}
}
}
我发现的是,当我 运行 发布者的主要内容时,它会在尝试发布到 "content" 主题时挂起,并且订阅者不会收到消息。
发布者的日志包含以下内容:
[INFO] [12/09/2016 15:14:35.513] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/system/distributedPubSubMediator] Message [java.lang.String] from Actor[akka://ClusterSystem/user/Publisher#-1478463431] to Actor[akka://ClusterSystem/system/distributedPubSubMediator#-1539813703] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
我一直在仔细研究文档,但主题概念似乎是事后才提到的。
为什么我的订阅者没有收到这条消息?
其实发布者和订阅者都很好。问题来自于您是从分布式发布订阅示例开始的。作为先决条件,您需要的当然是集群的设置,以便通过它分发消息。
Main.scala
:
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.Cluster
import com.example.Publisher
import com.example.Subscriber
object Main {
def main(args: Array[String]): Unit = {
val systemName = "PubSub"
val system1 = ActorSystem(systemName)
val joinAddress = Cluster(system1).selfAddress
Cluster(system1).join(joinAddress)
val publisher = system1.actorOf(Props[Publisher], "publisher")
Thread.sleep(5000)
val system2 = ActorSystem(systemName)
Cluster(system2).join(joinAddress)
system2.actorOf(Props[Subscriber], "subscriber")
Thread.sleep(5000)
publisher ! "something"
}
}
如果您现在 运行 Main.scala
您将看到以下内容:
- 正在创建 system1 并将其加入集群(隐式创建)
- 正在创建 system2 并加入集群
- 消息被发送到发布者,发布者将其转发给中介,中介随后将其分发到集群中
- 订阅者收到消息
编辑: 我将您的 application.conf
简化为如下所示:
akka {
loglevel = "INFO"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
log-dead-letters = 0
log-dead-letters-during-shutdown = off
}
注意 netty.tcp.port = 0
- 这将保证您获得分配的随机端口,并且集群成员不会发生端口冲突。您可以在我的输出中看到端口 49759 和 49772
输出:
Running Main
[INFO] [12/10/2016 12:44:09.175] [main] [akka.remote.Remoting] Starting remoting
[INFO] [12/10/2016 12:44:09.439] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://PubSub@127.0.0.1:49759]
[INFO] [12/10/2016 12:44:09.451] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Starting up...
[INFO] [12/10/2016 12:44:09.537] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Started up successfully
[INFO] [12/10/2016 12:44:09.537] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [12/10/2016 12:44:09.546] [PubSub-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [12/10/2016 12:44:09.549] [PubSub-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Metrics collection has started successfully
[INFO] [12/10/2016 12:44:09.573] [PubSub-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - No seed-nodes configured, manual cluster join required
[INFO] [12/10/2016 12:44:09.594] [PubSub-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Node [akka.tcp://PubSub@127.0.0.1:49759] is JOINING, roles []
[INFO] [12/10/2016 12:44:09.606] [PubSub-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Leader is moving node [akka.tcp://PubSub@127.0.0.1:49759] to [Up]
[INFO] [12/10/2016 12:44:14.598] [main] [akka.remote.Remoting] Starting remoting
[INFO] [12/10/2016 12:44:14.616] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://PubSub@127.0.0.1:49772]
[INFO] [12/10/2016 12:44:14.617] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Starting up...
[INFO] [12/10/2016 12:44:14.626] [main] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Started up successfully
[INFO] [12/10/2016 12:44:14.627] [PubSub-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [12/10/2016 12:44:14.627] [PubSub-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Metrics collection has started successfully
[INFO] [12/10/2016 12:44:14.633] [PubSub-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - No seed-nodes configured, manual cluster join required
[INFO] [12/10/2016 12:44:14.653] [PubSub-akka.actor.default-dispatcher-16] [akka.tcp://PubSub@127.0.0.1:49772/user/subscriber] subscribing
[INFO] [12/10/2016 12:44:14.844] [PubSub-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Node [akka.tcp://PubSub@127.0.0.1:49772] is JOINING, roles []
[INFO] [12/10/2016 12:44:14.920] [PubSub-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49772] - Welcome from [akka.tcp://PubSub@127.0.0.1:49759]
[INFO] [12/10/2016 12:44:15.580] [PubSub-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://PubSub)] Cluster Node [akka.tcp://PubSub@127.0.0.1:49759] - Leader is moving node [akka.tcp://PubSub@127.0.0.1:49772] to [Up]
Received 'something', transformed to 'SOMETHING'.
[INFO] [12/10/2016 12:44:19.673] [PubSub-akka.actor.default-dispatcher-4] [akka.tcp://PubSub@127.0.0.1:49772/user/subscriber] Got SOMETHING