Akka-Streams ActorPublisher 收不到任何 Request 消息
Akka-Streams ActorPublisher does not receive any Request messages
我正在尝试使用此库连续阅读维基百科 IRC 频道:https://github.com/implydata/wikiticker
我创建了一个自定义 Akka Publisher,它将在我的系统中用作 Source
。
这是我的一些 类:
class IrcPublisher() extends ActorPublisher[String] {
import scala.collection._
var queue: mutable.Queue[String] = mutable.Queue()
override def receive: Actor.Receive = {
case Publish(s) =>
println(s"->MSG, isActive = $isActive, totalDemand = $totalDemand")
queue.enqueue(s)
publishIfNeeded()
case Request(cnt) =>
println("Request: " + cnt)
publishIfNeeded()
case Cancel =>
println("Cancel")
context.stop(self)
case _ =>
println("Hm...")
}
def publishIfNeeded(): Unit = {
while (queue.nonEmpty && isActive && totalDemand > 0) {
println("onNext")
onNext(queue.dequeue())
}
}
}
object IrcPublisher {
case class Publish(data: String)
}
我正在创建所有这些对象:
def createSource(wikipedias: Seq[String]) {
val dataPublisherRef = system.actorOf(Props[IrcPublisher])
val dataPublisher = ActorPublisher[String](dataPublisherRef)
val listener = new MessageListener {
override def process(message: Message) = {
dataPublisherRef ! Publish(Jackson.generate(message.toMap))
}
}
val ticker = new IrcTicker(
"irc.wikimedia.org",
"imply",
wikipedias map (x => s"#$x.wikipedia"),
Seq(listener)
)
ticker.start() // if I comment this...
Thread.currentThread().join() //... and this I get Request(...)
Source.fromPublisher(dataPublisher)
}
所以我面临的问题是这个 Source
对象。虽然此实现适用于其他来源(例如来自本地文件),但 ActorPublisher 不会收到 Request()
消息。
如果我评论我可以看到的两条标记行,我的演员已经从我的流中收到请求(计数)消息。否则所有消息都将被推入队列,但不在我的流中(因此我可以看到打印的 MSG 消息)。
我认为这里 multithreading/synchronization 有点问题。
我对 wikiticker 不够熟悉,无法解决您给出的问题。我想问的一个问题是:为什么要加入当前线程?
但是,我认为您将 Source
的用法过于复杂化了。将流作为一个整体来处理比创建自定义 ActorPublisher
更容易。
您可以使用 Source.actorRef
将流具体化为 ActorRef 并使用该 ActorRef。这允许您利用 akka 代码在缓冲区上执行 enqueing/dequeing,同时您可以专注于 "business logic"。
比如说,你的整个流只是过滤超过一定长度的行并将它们打印到控制台。这可以通过以下方式完成:
def dispatchIRCMessages(actorRef : ActorRef) = {
val ticker =
new IrcTicker("irc.wikimedia.org",
"imply",
wikipedias map (x => s"#$x.wikipedia"),
Seq(new MessageListener {
override def process(message: Message) =
actorRef ! Publish(Jackson.generate(message.toMap))
}))
ticker.start()
Thread.currentThread().join()
}
//these variables control the buffer behavior
val bufferSize = 1024
val overFlowStrategy = akka.stream.OverflowStrategy.dropHead
val minMessageSize = 32
//no need for a custom Publisher/Queue
val streamRef =
Source.actorRef[String](bufferSize, overFlowStrategy)
.via(Flow[String].filter(_.size > minMessageSize))
.to(Sink.foreach[String](println))
.run()
dispatchIRCMessages(streamRef)
dispatchIRCMessages
有一个额外的好处,它可以与任何 ActorRef
一起工作,因此您不需要只与 streams/publishers.
一起工作
希望这能解决您的根本问题...
我认为主要问题是Thread.currentThread().join()
。此行将 'hang' 当前线程,因为此线程正在等待自己死亡。请阅读 https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#join-long- .
我正在尝试使用此库连续阅读维基百科 IRC 频道:https://github.com/implydata/wikiticker
我创建了一个自定义 Akka Publisher,它将在我的系统中用作 Source
。
这是我的一些 类:
class IrcPublisher() extends ActorPublisher[String] {
import scala.collection._
var queue: mutable.Queue[String] = mutable.Queue()
override def receive: Actor.Receive = {
case Publish(s) =>
println(s"->MSG, isActive = $isActive, totalDemand = $totalDemand")
queue.enqueue(s)
publishIfNeeded()
case Request(cnt) =>
println("Request: " + cnt)
publishIfNeeded()
case Cancel =>
println("Cancel")
context.stop(self)
case _ =>
println("Hm...")
}
def publishIfNeeded(): Unit = {
while (queue.nonEmpty && isActive && totalDemand > 0) {
println("onNext")
onNext(queue.dequeue())
}
}
}
object IrcPublisher {
case class Publish(data: String)
}
我正在创建所有这些对象:
def createSource(wikipedias: Seq[String]) {
val dataPublisherRef = system.actorOf(Props[IrcPublisher])
val dataPublisher = ActorPublisher[String](dataPublisherRef)
val listener = new MessageListener {
override def process(message: Message) = {
dataPublisherRef ! Publish(Jackson.generate(message.toMap))
}
}
val ticker = new IrcTicker(
"irc.wikimedia.org",
"imply",
wikipedias map (x => s"#$x.wikipedia"),
Seq(listener)
)
ticker.start() // if I comment this...
Thread.currentThread().join() //... and this I get Request(...)
Source.fromPublisher(dataPublisher)
}
所以我面临的问题是这个 Source
对象。虽然此实现适用于其他来源(例如来自本地文件),但 ActorPublisher 不会收到 Request()
消息。
如果我评论我可以看到的两条标记行,我的演员已经从我的流中收到请求(计数)消息。否则所有消息都将被推入队列,但不在我的流中(因此我可以看到打印的 MSG 消息)。
我认为这里 multithreading/synchronization 有点问题。
我对 wikiticker 不够熟悉,无法解决您给出的问题。我想问的一个问题是:为什么要加入当前线程?
但是,我认为您将 Source
的用法过于复杂化了。将流作为一个整体来处理比创建自定义 ActorPublisher
更容易。
您可以使用 Source.actorRef
将流具体化为 ActorRef 并使用该 ActorRef。这允许您利用 akka 代码在缓冲区上执行 enqueing/dequeing,同时您可以专注于 "business logic"。
比如说,你的整个流只是过滤超过一定长度的行并将它们打印到控制台。这可以通过以下方式完成:
def dispatchIRCMessages(actorRef : ActorRef) = {
val ticker =
new IrcTicker("irc.wikimedia.org",
"imply",
wikipedias map (x => s"#$x.wikipedia"),
Seq(new MessageListener {
override def process(message: Message) =
actorRef ! Publish(Jackson.generate(message.toMap))
}))
ticker.start()
Thread.currentThread().join()
}
//these variables control the buffer behavior
val bufferSize = 1024
val overFlowStrategy = akka.stream.OverflowStrategy.dropHead
val minMessageSize = 32
//no need for a custom Publisher/Queue
val streamRef =
Source.actorRef[String](bufferSize, overFlowStrategy)
.via(Flow[String].filter(_.size > minMessageSize))
.to(Sink.foreach[String](println))
.run()
dispatchIRCMessages(streamRef)
dispatchIRCMessages
有一个额外的好处,它可以与任何 ActorRef
一起工作,因此您不需要只与 streams/publishers.
希望这能解决您的根本问题...
我认为主要问题是Thread.currentThread().join()
。此行将 'hang' 当前线程,因为此线程正在等待自己死亡。请阅读 https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#join-long- .