Akka-Streams ActorPublisher 收不到任何 Request 消息

Akka-Streams ActorPublisher does not receive any Request messages

我正在尝试使用此库连续阅读维基百科 IRC 频道:https://github.com/implydata/wikiticker

我创建了一个自定义 Akka Publisher,它将在我的系统中用作 Source

这是我的一些 类:

class IrcPublisher() extends ActorPublisher[String] {
  import scala.collection._

  var queue: mutable.Queue[String] = mutable.Queue()

  override def receive: Actor.Receive = {
    case Publish(s) =>
      println(s"->MSG, isActive = $isActive, totalDemand = $totalDemand")
      queue.enqueue(s)
      publishIfNeeded()

    case Request(cnt) =>
      println("Request: " + cnt)
      publishIfNeeded()

    case Cancel =>
      println("Cancel")
      context.stop(self)

    case _ =>
      println("Hm...")
  }

  def publishIfNeeded(): Unit = {
    while (queue.nonEmpty && isActive && totalDemand > 0) {
      println("onNext")
      onNext(queue.dequeue())
    }
  }
 }

object IrcPublisher {
  case class Publish(data: String)
}

我正在创建所有这些对象:

  def createSource(wikipedias: Seq[String]) {
      val dataPublisherRef = system.actorOf(Props[IrcPublisher])
      val dataPublisher = ActorPublisher[String](dataPublisherRef)
      val listener = new MessageListener {
        override def process(message: Message) = {
          dataPublisherRef ! Publish(Jackson.generate(message.toMap))
        }
      }

      val ticker = new IrcTicker(
        "irc.wikimedia.org",
        "imply",
        wikipedias map (x => s"#$x.wikipedia"),
        Seq(listener)
      )

      ticker.start() // if I comment this...
      Thread.currentThread().join() //... and this I get Request(...)

      Source.fromPublisher(dataPublisher)
}

所以我面临的问题是这个 Source 对象。虽然此实现适用于其他来源(例如来自本地文件),但 ActorPublisher 不会收到 Request() 消息。

如果我评论我可以看到的两条标记行,我的演员已经从我的流中收到请求(计数)消息。否则所有消息都将被推入队列,但不在我的流中(因此我可以看到打印的 MSG 消息)。

我认为这里 multithreading/synchronization 有点问题。

我对 wikiticker 不够熟悉,无法解决您给出的问题。我想问的一个问题是:为什么要加入当前线程?

但是,我认为您将 Source 的用法过于复杂化了。将流作为一个整体来处理比创建自定义 ActorPublisher 更容易。

您可以使用 Source.actorRef 将流具体化为 ActorRef 并使用该 ActorRef。这允许您利用 akka 代码在缓冲区上执行 enqueing/dequeing,同时您可以专注于 "business logic"。

比如说,你的整个流只是过滤超过一定长度的行并将它们打印到控制台。这可以通过以下方式完成:

def dispatchIRCMessages(actorRef : ActorRef) = {
  val ticker = 
     new IrcTicker("irc.wikimedia.org",
                   "imply",
                   wikipedias map (x => s"#$x.wikipedia"),
                   Seq(new MessageListener {
                         override def process(message: Message) = 
                          actorRef ! Publish(Jackson.generate(message.toMap))
                       }))

  ticker.start()
  Thread.currentThread().join()
}


//these variables control the buffer behavior
val bufferSize = 1024
val overFlowStrategy = akka.stream.OverflowStrategy.dropHead

val minMessageSize = 32

//no need for a custom Publisher/Queue
val streamRef = 
  Source.actorRef[String](bufferSize, overFlowStrategy)
        .via(Flow[String].filter(_.size > minMessageSize))
        .to(Sink.foreach[String](println))
        .run()

dispatchIRCMessages(streamRef)

dispatchIRCMessages 有一个额外的好处,它可以与任何 ActorRef 一起工作,因此您不需要只与 streams/publishers.

一起工作

希望这能解决您的根本问题...

我认为主要问题是Thread.currentThread().join()。此行将 'hang' 当前线程,因为此线程正在等待自己死亡。请阅读 https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#join-long- .