为什么 Akka Streams 应用程序不能正常终止?
Why don't Akka Streams application terminate normally?
我使用 Alpakka Cassandra 库编写了这个简单的应用程序
package com.abhi
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.alpakka.cassandra.scaladsl.CassandraSource
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink}
import com.datastax.driver.core.{Cluster, Row, SimpleStatement}
import scala.concurrent.Await
import scala.concurrent.duration._
object MyApp extends App {
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
implicit val session = Cluster
.builder
.addContactPoints(List("localhost") :_*)
.withPort(9042)
.withCredentials("foo", "bar")
.build
.connect("foobar")
val stmt = new SimpleStatement("SELECT col1, col2 FROM foo").setFetchSize(20)
val source = CassandraSource(stmt)
val toFoo = Flow[Row].map(row => Foo(row.getLong(0), row.Long(1)))
val sink = Sink.foreach[Foo](foo => println(foo.col1, foo.col2))
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){ implicit b =>
s =>
import GraphDSL.Implicits._
source.take(10) ~> toFoo ~> s
ClosedShape
})
// let us run the graph
val future = graph.run()
import actorSystem.dispatcher
future.onComplete{_ =>
session.close()
Await.result(actorSystem.terminate(), Duration.Inf)
}
Await.result(future, Duration.Inf)
System.exit(0)
}
case class Foo(col1: Long, col2: Long)
此应用程序运行完全符合预期,它在屏幕上打印了 10 行。
但是 post 它挂起。执行 System.exit(0)
调用时会抛出异常
Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-0"
但应用程序仍然没有停止 运行。它只是挂起。
我不明白为什么这个应用程序不能正常终止(事实上它甚至不需要 system.exit(0) 调用。
退出此应用程序的唯一方法是通过控件 C。
这可能是因为 sbt 在它自己的 JVM 实例中运行您的代码,您的 System.exit
将退出 sbt 的 JVM 并给出上述结果。
您是否尝试设置:fork in run := true
在您的 sbt 构建中的某处?
我也不确定使用 actorSystem.dispatcher
来执行你的 onComplete
回调是个好主意(因为你用它来等待 actor 系统本身的终止)。
您可以尝试的东西:
import actorSystem.dispatcher
future.onComplete{ _ =>
session.close()
actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)
请注意,当仅剩的线程是守护线程时,JVM 将退出而您无需调用 System.exit
(参见示例 What is Daemon thread in Java?)。
我使用 Alpakka Cassandra 库编写了这个简单的应用程序
package com.abhi
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.alpakka.cassandra.scaladsl.CassandraSource
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink}
import com.datastax.driver.core.{Cluster, Row, SimpleStatement}
import scala.concurrent.Await
import scala.concurrent.duration._
object MyApp extends App {
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
implicit val session = Cluster
.builder
.addContactPoints(List("localhost") :_*)
.withPort(9042)
.withCredentials("foo", "bar")
.build
.connect("foobar")
val stmt = new SimpleStatement("SELECT col1, col2 FROM foo").setFetchSize(20)
val source = CassandraSource(stmt)
val toFoo = Flow[Row].map(row => Foo(row.getLong(0), row.Long(1)))
val sink = Sink.foreach[Foo](foo => println(foo.col1, foo.col2))
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){ implicit b =>
s =>
import GraphDSL.Implicits._
source.take(10) ~> toFoo ~> s
ClosedShape
})
// let us run the graph
val future = graph.run()
import actorSystem.dispatcher
future.onComplete{_ =>
session.close()
Await.result(actorSystem.terminate(), Duration.Inf)
}
Await.result(future, Duration.Inf)
System.exit(0)
}
case class Foo(col1: Long, col2: Long)
此应用程序运行完全符合预期,它在屏幕上打印了 10 行。
但是 post 它挂起。执行 System.exit(0)
调用时会抛出异常
Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-0"
但应用程序仍然没有停止 运行。它只是挂起。
我不明白为什么这个应用程序不能正常终止(事实上它甚至不需要 system.exit(0) 调用。
退出此应用程序的唯一方法是通过控件 C。
这可能是因为 sbt 在它自己的 JVM 实例中运行您的代码,您的 System.exit
将退出 sbt 的 JVM 并给出上述结果。
您是否尝试设置:fork in run := true
在您的 sbt 构建中的某处?
我也不确定使用 actorSystem.dispatcher
来执行你的 onComplete
回调是个好主意(因为你用它来等待 actor 系统本身的终止)。
您可以尝试的东西:
import actorSystem.dispatcher
future.onComplete{ _ =>
session.close()
actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)
请注意,当仅剩的线程是守护线程时,JVM 将退出而您无需调用 System.exit
(参见示例 What is Daemon thread in Java?)。