播放框架异步代码不等待结果
Play framework async code dos not wait to result
我有三个功能。
myFunc1:
def myFunc1(cluster: String, consumer: String) = Action.async { implicit request =>
kafkaManager.getConsumerIdentity(cluster, consumer) map { errorOrT =>
errorOrT.fold(
error => BadRequest(Json.obj("msg" -> error.msg)),
T=> {
Ok(Json.obj("summary" -> getSummary(cluster, consumer, T.Map.keys)))
})
}
获取摘要:
def getSummary(cluster: String, consumer: String, myMap: Iterable[String]) = {
var topics: Map[String, JsObject] = Map()
myMap.map { key =>
topicSummary(cluster, consumer, x).map(r => {
r.fold(l => {}, value => {
topics += key -> value
})
})
}
topics
}
和主题摘要:
def topicSummary(cluster: String, consumer: String, topic: String) = {
kafkaManager.getConsumedTopicState(cluster, consumer, topic).map { errorOrTopicSummary =>
errorOrTopicSummary.map(
topicSummary => {
Json.obj("totalLag" -> topicSummary.totalLag.getOrElse(None).toString(), "percentageCovered" -> topicSummary.percentageCovered)
})
}
}
结果是:
{"summary":()}
问题是 getSummary 没有等到结果。 我很想听听有关如何修复它的建议
很难确切地说出发生了什么,因为您没有在函数中添加明确的 return 类型。假设所有 Kafka 调用都是异步的,似乎 正在发生的是 topicSummary
return 一个 Future[JsObject]
,但是 getSummary
,这调用它,不等待它的结果,而是 return 立即创建(空)主题映射。
当你处理期货时,你需要:
- 通过你的代码路径处理 Futures(推荐)
- 明确地
Await
实现其结果的未来(不推荐)
您可以通过异步实现 getSummary
来解决这个问题,它看起来像这样:
myFunc1:
def myFunc1(cluster: String, consumer: String) = Action.async { implicit request =>
// NB: Since we're dealing with a Future within a Future, we
// use flatMap to combine them
kafkaManager.getConsumerIdentity(cluster, consumer).flatMap { errorOrT =>
errorOrT.fold(
error =>
// In the case of an error, return a no-op Future with .successful
Future.successful(BadRequest(Json.obj("msg" -> error.msg))),
// Otherwise, map over the results of get summary
T => getSummary(cluster, consumer, T.Map.keys).map { topics =>
Ok(Json.obj("summary" -> topics))
}
)
}
}
getSummary(大概代码):
def getSummary(cluster: String, consumer: String, myMap: Iterable[String]): Future[Map[String, JsObject]] = {
// For each key you expect a Future[JsObject], which you want to
// transform into a tuple of key/object
val topicKeys: List[Future[(String, JsObject)]] = myMap.toList.map { key =>
topicSummary(cluster, consumer, key)
// Map the future optional value with its key, defaulting
// to an empty object
.map(topicsOpt => key -> topicsOpt.getOrElse(Json.obj()))
// OPTIONAL: handle any error with an empty object
.recover {
case _ => key -> Json.obj()
}
}
// Finally, use Future.fold to turn a list of futures into a
// single Future sequence, then combine the (String, JsObject)
// tuples into a map
Future.fold(topicKeys)(Map.empty[String, JsObject])(_ + _)
}
在处理 future 时明确预期的 return 类型非常有用,至少在尝试理解中间状态时是这样。如果您最终得到太多嵌套地图和平面地图,请研究使用 for
推导式使事情看起来更清晰,但这是另一个问题。