ExecutionContext 导致 Akka 死信
ExecutionContext causes Akka dead letter
出于某种原因,我必须同时使用 gRPC 和 Akka。当这个演员开始成为顶级演员时,没有任何问题(在这个小演示中)。但是变成child actor后,就收不到任何消息,记录如下:
[default-akka.actor.default-dispatcher-6] [akka://default/user/Grpc] Message [AkkaMessage.package$GlobalStart] from Actor[akka://default/user/TrackerCore#-808631363] to Actor[akka://default/user/Grpc#-1834173068] was not delivered. [1] dead letters encountered.
示例核心:
class GrpcActor() extends Actor {
val ec = scala.concurrent.ExecutionContext.global
val service = grpcService.bindService(new GrpcServerImpl(), ec)
override def receive: Receive = {
case GlobalStart() => {
println("GlobalStart")
}
...
}
}
我尝试创建一个新的 ExecutionContext
,例如:
scala.concurrent.ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
为什么会这样,我该如何调试这样的死信问题(不抛出异常)?
更新:
抱歉,我没有在这里列出所有内容。我使用正常的 Main 方法将 GrpcActor
作为顶级演员进行测试,而 ScalaTest 将其作为子演员进行测试,这是一个错误。
class GrpcActorTest extends FlatSpec with Matchers{
implicit val system = ActorSystem()
val actor: ActorRef = system.actorOf(Props[GrpcActor])
actor ! GlobalStart()
}
正是这个空的测试套件主动关闭了整个 actor 系统。但问题出在这一行
val service = grpcService.bindService(new GrpcServerImpl(), ec)
GlobalStart()
的交付在关闭后延迟。
没有那条线,消息可以在关机前送达。
这是正常行为吗?
(我的猜测:碰巧 GlobalStart()
在该行的关闭消息之后排队,这做了一些繁重的工作并在时间上有所不同)
向其父级添加监督策略,将 println 添加至 actor 生命周期。有些东西会杀死你的演员。最后,如果你提供一个完整的例子,也许我可以说更多:)
解决该问题的一种方法是将 service
设为 lazy val
:
class GrpcActor extends Actor {
...
lazy val service = grpcService.bindService(new GrpcServerImpl(), ec)
...
}
A lazy val
对 long-运行 操作很有用:在这种情况下,它将 service
的初始化推迟到第一次使用时。如果没有 lazy
修饰符,service
在创建 actor 时被初始化。
另一种方法是在测试中添加 Thread.sleep
以防止 actor 系统在 actor 完全初始化之前关闭:
class GrpcActorTest extends FlatSpec with Matchers {
...
actor ! GlobalStart()
Thread.sleep(5000) // or whatever length of time is needed to initialize the actor
}
(作为旁注,考虑使用 Akka Testkit 进行 actor 测试。)
出于某种原因,我必须同时使用 gRPC 和 Akka。当这个演员开始成为顶级演员时,没有任何问题(在这个小演示中)。但是变成child actor后,就收不到任何消息,记录如下:
[default-akka.actor.default-dispatcher-6] [akka://default/user/Grpc] Message [AkkaMessage.package$GlobalStart] from Actor[akka://default/user/TrackerCore#-808631363] to Actor[akka://default/user/Grpc#-1834173068] was not delivered. [1] dead letters encountered.
示例核心:
class GrpcActor() extends Actor {
val ec = scala.concurrent.ExecutionContext.global
val service = grpcService.bindService(new GrpcServerImpl(), ec)
override def receive: Receive = {
case GlobalStart() => {
println("GlobalStart")
}
...
}
}
我尝试创建一个新的 ExecutionContext
,例如:
scala.concurrent.ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
为什么会这样,我该如何调试这样的死信问题(不抛出异常)?
更新:
抱歉,我没有在这里列出所有内容。我使用正常的 Main 方法将 GrpcActor
作为顶级演员进行测试,而 ScalaTest 将其作为子演员进行测试,这是一个错误。
class GrpcActorTest extends FlatSpec with Matchers{
implicit val system = ActorSystem()
val actor: ActorRef = system.actorOf(Props[GrpcActor])
actor ! GlobalStart()
}
正是这个空的测试套件主动关闭了整个 actor 系统。但问题出在这一行
val service = grpcService.bindService(new GrpcServerImpl(), ec)
GlobalStart()
的交付在关闭后延迟。
没有那条线,消息可以在关机前送达。
这是正常行为吗?
(我的猜测:碰巧 GlobalStart()
在该行的关闭消息之后排队,这做了一些繁重的工作并在时间上有所不同)
向其父级添加监督策略,将 println 添加至 actor 生命周期。有些东西会杀死你的演员。最后,如果你提供一个完整的例子,也许我可以说更多:)
解决该问题的一种方法是将 service
设为 lazy val
:
class GrpcActor extends Actor {
...
lazy val service = grpcService.bindService(new GrpcServerImpl(), ec)
...
}
A lazy val
对 long-运行 操作很有用:在这种情况下,它将 service
的初始化推迟到第一次使用时。如果没有 lazy
修饰符,service
在创建 actor 时被初始化。
另一种方法是在测试中添加 Thread.sleep
以防止 actor 系统在 actor 完全初始化之前关闭:
class GrpcActorTest extends FlatSpec with Matchers {
...
actor ! GlobalStart()
Thread.sleep(5000) // or whatever length of time is needed to initialize the actor
}
(作为旁注,考虑使用 Akka Testkit 进行 actor 测试。)