使用 ScalaFX 监控 Akka 流源
Monitoring Akka Streams Sources with ScalaFX
我要解决的是以下情况:
给定无限 运行ning Akka Stream 我希望能够监视流的某些点。此时我能想到的将消息发送到 Actor 的最佳方式也是 Source
。这使我可以非常灵活地连接单个源或将多个源合并到 websocket 或我想要连接的任何其他客户端。但是,在这种特定情况下,我试图将 ScalaFX 与 Akka Source 连接起来,但它没有按预期工作。
当我 运行 时,两个计数器下面的代码开始时都正常,但过了一会儿,其中一个停止了,再也没有恢复。我知道在使用 ScalaFX 时对线程有特殊的考虑,但我没有足够的知识来理解这里发生了什么或调试它。下面是 运行 的一个最小示例,问题应该在大约 5 秒后可见。
My question is:
How could I change this code to work as expected?
import akka.NotUsed
import scalafx.Includes._
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.{ActorMaterializer, OverflowStrategy, ThrottleMode}
import akka.stream.scaladsl.{Flow, Sink, Source}
import scalafx.application.JFXApp
import scalafx.beans.property.{IntegerProperty, StringProperty}
import scalafx.scene.Scene
import scalafx.scene.layout.BorderPane
import scalafx.scene.text.Text
import scala.concurrent.duration._
/**
* Created by henke on 2017-06-10.
*/
object MonitorApp extends JFXApp {
implicit val system = ActorSystem("monitor")
implicit val mat = ActorMaterializer()
val value1 = StringProperty("0")
val value2 = StringProperty("0")
stage = new JFXApp.PrimaryStage {
title = "Akka Stream Monitor"
scene = new Scene(600, 400) {
root = new BorderPane() {
left = new Text {
text <== value1
}
right = new Text {
text <== value2
}
}
}
}
override def stopApp() = system.terminate()
val monitor1 = createMonitor[Int]
val monitor2 = createMonitor[Int]
val marketChangeActor1 = monitor1
.to(Sink.foreach{ v =>
value1() = v.toString
}).run()
val marketChangeActor2 = monitor2
.to(Sink.foreach{ v =>
value2() = v.toString
}).run()
val monitorActor = Source[Int](1 to 100)
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.via(logToMonitorAndContinue(marketChangeActor1))
.map(_ * 10)
.via(logToMonitorAndContinue(marketChangeActor2))
.to(Sink.ignore).run()
def createMonitor[T]: Source[T, ActorRef] = Source.actorRef[T](Int.MaxValue, OverflowStrategy.fail)
def logToMonitorAndContinue[T](monitor: ActorRef): Flow[T, T, NotUsed] = {
Flow[T].map{ e =>
monitor ! e
e
}
}
}
您似乎在 actor 系统线程中为属性赋值(并因此影响 UI)。但是,与 UI 的所有交互都应在 JavaFX GUI 线程中完成。尝试包装 value1() = v.toString
和 Platform.runLater 调用中的第二个。
除了在 JavaFX-Swing 集成文档中,我无法找到关于使用 runLater
与 JavaFX 数据交互的明确声明,但这在 UI 库中很常见;例如,使用 SwingUtilities.invokeLater
方法的 Swing 也是如此。
我要解决的是以下情况:
给定无限 运行ning Akka Stream 我希望能够监视流的某些点。此时我能想到的将消息发送到 Actor 的最佳方式也是 Source
。这使我可以非常灵活地连接单个源或将多个源合并到 websocket 或我想要连接的任何其他客户端。但是,在这种特定情况下,我试图将 ScalaFX 与 Akka Source 连接起来,但它没有按预期工作。
当我 运行 时,两个计数器下面的代码开始时都正常,但过了一会儿,其中一个停止了,再也没有恢复。我知道在使用 ScalaFX 时对线程有特殊的考虑,但我没有足够的知识来理解这里发生了什么或调试它。下面是 运行 的一个最小示例,问题应该在大约 5 秒后可见。
My question is:
How could I change this code to work as expected?
import akka.NotUsed
import scalafx.Includes._
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.{ActorMaterializer, OverflowStrategy, ThrottleMode}
import akka.stream.scaladsl.{Flow, Sink, Source}
import scalafx.application.JFXApp
import scalafx.beans.property.{IntegerProperty, StringProperty}
import scalafx.scene.Scene
import scalafx.scene.layout.BorderPane
import scalafx.scene.text.Text
import scala.concurrent.duration._
/**
* Created by henke on 2017-06-10.
*/
object MonitorApp extends JFXApp {
implicit val system = ActorSystem("monitor")
implicit val mat = ActorMaterializer()
val value1 = StringProperty("0")
val value2 = StringProperty("0")
stage = new JFXApp.PrimaryStage {
title = "Akka Stream Monitor"
scene = new Scene(600, 400) {
root = new BorderPane() {
left = new Text {
text <== value1
}
right = new Text {
text <== value2
}
}
}
}
override def stopApp() = system.terminate()
val monitor1 = createMonitor[Int]
val monitor2 = createMonitor[Int]
val marketChangeActor1 = monitor1
.to(Sink.foreach{ v =>
value1() = v.toString
}).run()
val marketChangeActor2 = monitor2
.to(Sink.foreach{ v =>
value2() = v.toString
}).run()
val monitorActor = Source[Int](1 to 100)
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.via(logToMonitorAndContinue(marketChangeActor1))
.map(_ * 10)
.via(logToMonitorAndContinue(marketChangeActor2))
.to(Sink.ignore).run()
def createMonitor[T]: Source[T, ActorRef] = Source.actorRef[T](Int.MaxValue, OverflowStrategy.fail)
def logToMonitorAndContinue[T](monitor: ActorRef): Flow[T, T, NotUsed] = {
Flow[T].map{ e =>
monitor ! e
e
}
}
}
您似乎在 actor 系统线程中为属性赋值(并因此影响 UI)。但是,与 UI 的所有交互都应在 JavaFX GUI 线程中完成。尝试包装 value1() = v.toString
和 Platform.runLater 调用中的第二个。
除了在 JavaFX-Swing 集成文档中,我无法找到关于使用 runLater
与 JavaFX 数据交互的明确声明,但这在 UI 库中很常见;例如,使用 SwingUtilities.invokeLater
方法的 Swing 也是如此。