播放框架异步代码不等待结果

Play framework async code dos not wait to result

我有三个功能。

myFunc1:

def myFunc1(cluster: String, consumer: String) = Action.async { implicit request =>
  kafkaManager.getConsumerIdentity(cluster, consumer) map { errorOrT =>
    errorOrT.fold(
      error => BadRequest(Json.obj("msg" -> error.msg)),
      T=> {
        Ok(Json.obj("summary" -> getSummary(cluster, consumer, T.Map.keys)))
      })
}

获取摘要:

def getSummary(cluster: String, consumer: String, myMap: Iterable[String]) = {
  var topics: Map[String, JsObject] = Map()
  myMap.map { key =>
    topicSummary(cluster, consumer, x).map(r => {
      r.fold(l => {}, value => {
        topics += key -> value
      })
    })
  }
  topics
}

和主题摘要:

def topicSummary(cluster: String, consumer: String, topic: String) = {
  kafkaManager.getConsumedTopicState(cluster, consumer, topic).map { errorOrTopicSummary =>
    errorOrTopicSummary.map(
      topicSummary => {
        Json.obj("totalLag" -> topicSummary.totalLag.getOrElse(None).toString(), "percentageCovered" -> topicSummary.percentageCovered)
    })
  }
}

结果是:

{"summary":()}

问题是 getSummary 没有等到结果。 我很想听听有关如何修复它的建议

很难确切地说出发生了什么,因为您没有在函数中添加明确的 return 类型。假设所有 Kafka 调用都是异步的,似乎 正在发生的是 topicSummary return 一个 Future[JsObject],但是 getSummary,这调用它,不等待它的结果,而是 return 立即创建(空)主题映射。

当你处理期货时,你需要:

  • 通过你的代码路径处理 Futures(推荐)
  • 明确地 Await 实现其结果的未来(不推荐)

您可以通过异步实现 getSummary 来解决这个问题,它看起来像这样:

myFunc1:

def myFunc1(cluster: String, consumer: String) = Action.async { implicit request =>
  // NB: Since we're dealing with a Future within a Future, we
  // use flatMap to combine them
  kafkaManager.getConsumerIdentity(cluster, consumer).flatMap { errorOrT =>
    errorOrT.fold(
      error => 
        // In the case of an error, return a no-op Future with .successful
        Future.successful(BadRequest(Json.obj("msg" -> error.msg))),

      // Otherwise, map over the results of get summary
      T => getSummary(cluster, consumer, T.Map.keys).map { topics =>
        Ok(Json.obj("summary" -> topics))
      }
    )
  }
}

getSummary(大概代码):

def getSummary(cluster: String, consumer: String, myMap: Iterable[String]): Future[Map[String, JsObject]] = {

  // For each key you expect a Future[JsObject], which you want to
  // transform into a tuple of key/object
  val topicKeys: List[Future[(String, JsObject)]] = myMap.toList.map { key =>
    topicSummary(cluster, consumer, key)
      // Map the future optional value with its key, defaulting
      // to an empty object
      .map(topicsOpt => key -> topicsOpt.getOrElse(Json.obj()))

      // OPTIONAL: handle any error with an empty object
      .recover {
        case _ => key -> Json.obj()
      }
  }

  // Finally, use Future.fold to turn a list of futures into a 
  // single Future sequence, then combine the (String, JsObject)
  // tuples into a map
  Future.fold(topicKeys)(Map.empty[String, JsObject])(_ + _)
}

在处理 future 时明确预期的 return 类型非常有用,至少在尝试理解中间状态时是这样。如果您最终得到太多嵌套地图和平面地图,请研究使用 for 推导式使事情看起来更清晰,但这是另一个问题。