Akka actor 抛出空指针错误,原因未知
Akka actor throws null pointer error, reason unknown
我有一个调用服务的休息控制器,该服务又调用一个参与者从模拟数据库中获取查询。消息到达参与者,但应用程序在参与者响应之前崩溃,并且参与者出现空指针异常。我正在使用 akka http 作为控制器和路由指令来编写响应。这些是我的依赖项:
"com.typesafe.akka" %% "akka-http" % "10.1.8",
"com.typesafe.akka" %% "akka-actor" % "2.5.22",
"com.typesafe.akka" %% "akka-stream" % "2.5.22",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8"
class CacheActor extends Actor {
val tweetRepositoryInMemory: TweetRepositoryInMemory = new TweetRepositoryInMemory()
val log = Logging(context.system, this)
var tweetMap: scala.collection.mutable.Map[String, List[String]] =
scala.collection.mutable.Map[String, List[String]]()
// consult the in-memory map, if the username is not found, call the repository, update the map, and return the tweets
def queryRepo(username: String): Future[Option[List[String]]] = {
if (tweetMap isDefinedAt username) {
return Future(tweetMap.get(username))
} else {
var listOfTweetTexts: List[String] = List[String]()
val queryLimit = 10
val resultTweets: Future[Seq[Tweet]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)
onComplete(resultTweets) {
case Success(tweets) =>
for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }
tweetMap(username) = listOfTweetTexts
return Future(Option(listOfTweetTexts))
case Failure(t) =>
log.error("An error has occurred: " + t.getMessage)
return null
}
}
return null
}
def receive = {
case message: TweetQuery => // .take(message.limit)
val queryResult: Future[Option[List[String]]] = queryRepo(message.userName)
queryResult onComplete {
case Success(result) => sender() ! result
case Failure(t) => log.error("An error has occurred: " + t.getMessage)
}
}
}
堆栈跟踪会有所帮助,但我怀疑您 receive
中的这一行会导致 NPE:
queryResult onComplete {
你的 queryRepo
函数 returns null
如果 tweetMap
没有定义在 username
.
更新
原因如下:
queryRepo
函数 return 恰好在一种情况下是 Furture[Seq[String]]
if (tweetMap isDefinedAt username) {
return Future(tweetMap.get(username))
}
在 else 块中创建一个 Future[Seq[String]]
val resultTweets: Future[Seq[String]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)
但你永远不会return那样。 相反, 您将回调传递给 Future,该回调在 futures 完成时异步执行,因此 onComplete
。 (我注意到你没有直接在 Future
上调用 onComplete
而是一个函数 onComplete
将未来和部分函数作为参数,我假设该函数注册了常规onComplete
回调。)
因此 if 语句的 else 块的结果是 Unit
而 不是 Future[Seq[String]]
密码
for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }
tweetMap(username) = listOfTweetTexts
return Future(Option(listOfTweetTexts))
很可能在 queryRepo
已经 return 编辑了 null
之后执行。
def queryRepo(username: String): Future[Option[List[String]]] = {
if (tweetMap isDefinedAt username) {
...
} else {
...
}
return null // <--- here
}
更新结束
如果更改以下行:
val resultTweets: Future[Seq[Tweet]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)
onComplete(resultTweets) {
case Success(tweets) =>
for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }
tweetMap(username) = listOfTweetTexts
return Future(Option(listOfTweetTexts))
case Failure(t) =>
log.error("An error has occurred: " + t.getMessage)
return null
}
至:
tweetRepositoryInMemory.searchByUserName(username, queryLimit).map { tweets =>
// NOTE: This happens asynchronously in the `Future`. IT is better not to close over local variables
val listOfTweetTexts = for (tweet <- tweets) yield { tweet.text }
// again, access to an actor member from within a `Future` is not wise or rather a bug in the making.
// But I will not refactor that much here. The way to do this would be to send a message to self and process the mutable member within `receive`
tweetMap(username) = listOfTweetTexts
Option(listOfTweetTexts)
}
NullPointerException
不应再出现。
但是,我的印象是您可以使用 futures、actors 和 函数式编程 通常在 Scala 中。
例如,
- Actor 的可变本地状态仅在从其
receive
内部访问时才有效,而不是在异步 Future
或 Thread
中访问
- 通常不会在 scala 中使用 return 关键字
- 如果不是 java api 互操作性,就没有必要 ever return
null
。
- 还有很多点...
我有一个调用服务的休息控制器,该服务又调用一个参与者从模拟数据库中获取查询。消息到达参与者,但应用程序在参与者响应之前崩溃,并且参与者出现空指针异常。我正在使用 akka http 作为控制器和路由指令来编写响应。这些是我的依赖项:
"com.typesafe.akka" %% "akka-http" % "10.1.8",
"com.typesafe.akka" %% "akka-actor" % "2.5.22",
"com.typesafe.akka" %% "akka-stream" % "2.5.22",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8"
class CacheActor extends Actor {
val tweetRepositoryInMemory: TweetRepositoryInMemory = new TweetRepositoryInMemory()
val log = Logging(context.system, this)
var tweetMap: scala.collection.mutable.Map[String, List[String]] =
scala.collection.mutable.Map[String, List[String]]()
// consult the in-memory map, if the username is not found, call the repository, update the map, and return the tweets
def queryRepo(username: String): Future[Option[List[String]]] = {
if (tweetMap isDefinedAt username) {
return Future(tweetMap.get(username))
} else {
var listOfTweetTexts: List[String] = List[String]()
val queryLimit = 10
val resultTweets: Future[Seq[Tweet]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)
onComplete(resultTweets) {
case Success(tweets) =>
for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }
tweetMap(username) = listOfTweetTexts
return Future(Option(listOfTweetTexts))
case Failure(t) =>
log.error("An error has occurred: " + t.getMessage)
return null
}
}
return null
}
def receive = {
case message: TweetQuery => // .take(message.limit)
val queryResult: Future[Option[List[String]]] = queryRepo(message.userName)
queryResult onComplete {
case Success(result) => sender() ! result
case Failure(t) => log.error("An error has occurred: " + t.getMessage)
}
}
}
堆栈跟踪会有所帮助,但我怀疑您 receive
中的这一行会导致 NPE:
queryResult onComplete {
你的 queryRepo
函数 returns null
如果 tweetMap
没有定义在 username
.
更新
原因如下:
queryRepo
函数 return 恰好在一种情况下是 Furture[Seq[String]]
if (tweetMap isDefinedAt username) {
return Future(tweetMap.get(username))
}
在 else 块中创建一个 Future[Seq[String]]
val resultTweets: Future[Seq[String]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)
但你永远不会return那样。 相反, 您将回调传递给 Future,该回调在 futures 完成时异步执行,因此 onComplete
。 (我注意到你没有直接在 Future
上调用 onComplete
而是一个函数 onComplete
将未来和部分函数作为参数,我假设该函数注册了常规onComplete
回调。)
因此 if 语句的 else 块的结果是 Unit
而 不是 Future[Seq[String]]
密码
for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }
tweetMap(username) = listOfTweetTexts
return Future(Option(listOfTweetTexts))
很可能在 queryRepo
已经 return 编辑了 null
之后执行。
def queryRepo(username: String): Future[Option[List[String]]] = {
if (tweetMap isDefinedAt username) {
...
} else {
...
}
return null // <--- here
}
更新结束
如果更改以下行:
val resultTweets: Future[Seq[Tweet]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)
onComplete(resultTweets) {
case Success(tweets) =>
for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }
tweetMap(username) = listOfTweetTexts
return Future(Option(listOfTweetTexts))
case Failure(t) =>
log.error("An error has occurred: " + t.getMessage)
return null
}
至:
tweetRepositoryInMemory.searchByUserName(username, queryLimit).map { tweets =>
// NOTE: This happens asynchronously in the `Future`. IT is better not to close over local variables
val listOfTweetTexts = for (tweet <- tweets) yield { tweet.text }
// again, access to an actor member from within a `Future` is not wise or rather a bug in the making.
// But I will not refactor that much here. The way to do this would be to send a message to self and process the mutable member within `receive`
tweetMap(username) = listOfTweetTexts
Option(listOfTweetTexts)
}
NullPointerException
不应再出现。
但是,我的印象是您可以使用 futures、actors 和 函数式编程 通常在 Scala 中。
例如,
- Actor 的可变本地状态仅在从其
receive
内部访问时才有效,而不是在异步Future
或Thread
中访问
- 通常不会在 scala 中使用 return 关键字
- 如果不是 java api 互操作性,就没有必要 ever return
null
。 - 还有很多点...