监听远程 akka ActorSystem 的日志流

Listening to a remote akka ActorSystem's log stream

我正在尝试订阅远程 akka ActorSystem 的日志流,基本上是为了编写一个控制台来显示远程 Actors 的 运行 日志。

我能想到的唯一方法是:在日志记录 ActorSystem 中创建一个 Actor,让该 Actor 订阅 ActorSystem.eventStream,然后使用 actorSelection 从内部订阅该 Actor我的控制台的 ActorSystem。

但这看起来非常 "indirect" 因为日志管道看起来像:

logging Actor --> eventStream --> Actor subscribed to eventStream --> local Actor

有没有更简单的订阅事件流的方法?

From a simplicity viewpoint, nothing forbids you to subscribe a remote actor to your event stream without an additional actor. The Akka documentation mentions:

The event stream is a local facility, meaning that it will not distribute events to other nodes in a clustered environment (unless you subscribe a Remote Actor to the stream explicitly). If you need to broadcast events in an Akka cluster, without knowing your recipients explicitly (i.e. obtaining their ActorRefs), you may want to look into: Distributed Publish Subscribe in Cluster.

For illustration purposes, consider the following code fragment which corresponds to the remote system, the one you want to subscribe to:

  class PublisherActor extends Actor with ActorLogging { // example publisher actor just to generate some logs
    context.system.scheduler.schedule(1.second, 1.second, self, "echo")
    def receive = {
      case "echo" ⇒
        val x = Random.nextInt(100)
        log.info(s"I got a random number: $x")
    }
  }

  def runPublisher() = {
    println("=== running publisher node ===")
    val system = ActorSystem("PublisherSystem")
    val selection = system.actorSelection("akka.tcp://SubscriberSystem@127.0.0.1:2553/user/subscriber")
    selection.resolveOne(10.seconds) onSuccess { // when the listener actor is available,
      case listener ⇒ system.eventStream.subscribe(listener, classOf[LogEvent]) // subscribe it to the event stream
    }
    val publisher = system.actorOf(Props[PublisherActor], "publisher") // some example publisher
  }

And then the corresponding subscriber in the "local" node, from where you want to show the logs:

  class SubscriberActor extends Actor with ActorLogging {
    log.info("subscriber listening...")
    def receive = {
      case msg ⇒ log.info(s"Got: $msg")
    }
  }

  def runSubscriber() = {
    println("=== running subscriber node ===")
    val system = ActorSystem("SubscriberSystem")
    val listener = system.actorOf(Props[SubscriberActor], "subscriber")
  }

However, there are several caveats to this solution, as the fact that the publisher must be 运行 before the subscriber (or the subscriber implement some retry policy until the publisher is up), the location is hardcoded等等。 If you want to have a more robust and resilient system and it's permissible, follow the advice in the documentation and use a distributed publisher-subscriber in a clustered environment which poses several advantages with a similar amount of boilerplate.

Hope it helped!