Akka HTTP Streaming JSON 反序列化

Akka HTTP Streaming JSON Deserialization

是否可以动态反序列化一个外部,长度未知,ByteString从Akka HTTP流到域对象?


上下文

我调用一个 无限 HTTP 端点输出一个 JSON Array 不断增长的:

[
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    ...
] <- Never sees the daylight

我想play-iteratees-extras一定对你有帮助。这个库允许通过 Enumerator/Iteratee 模式解析 Json,当然,不要等待接收所有数据。

例如,以免构建 'infinite' 表示 'infinite' Json 数组的字节流。

import play.api.libs.iteratee.{Enumeratee, Enumerator, Iteratee}

var i = 0
var isFirstWas = false

val max = 10000

val stream = Enumerator("[".getBytes) andThen Enumerator.generateM {
  Future {
    i += 1
    if (i < max) {
      val json = Json.stringify(Json.obj(
        "prop" -> Random.nextBoolean(),
        "prop2" -> Random.nextBoolean(),
        "prop3" -> Random.nextInt(),
        "prop4" -> Random.alphanumeric.take(5).mkString("")
      ))

      val string = if (isFirstWas) {
        "," + json
      } else {
        isFirstWas = true
        json
      }


      Some(Codec.utf_8.encode(string))
    } else if (i == max) Some("]".getBytes) // <------ this is the last jsArray closing tag
    else None

  }
}

好的,这个值包含 10000 个(或更多)对象的 jsArray。让我们定义 case class ,它将包含我们数组中每个对象的数据。

case class Props(prop: Boolean, prop2: Boolean, prop3: Int, prop4: String)

现在编写解析器,它将解析每个项目

import play.extras.iteratees._    
import JsonBodyParser._
import JsonIteratees._
import JsonEnumeratees._

val parser = jsArray(jsValues(jsSimpleObject)) ><> Enumeratee.map { json =>
  for {
    prop <- json.\("prop").asOpt[Boolean]
    prop2 <- json.\("prop2").asOpt[Boolean]
    prop3 <- json.\("prop3").asOpt[Int]
    prop4 <- json.\("prop4").asOpt[String]
  } yield Props(prop, prop2, prop3, prop4)
}

jsArrayjsValuesjsSimpleObject 请参阅 doc。构建结果生成器:

val result = stream &> Encoding.decode() ><> parser
来自 JsonIteratees 包的

Encoding.decode() 会将字节解码为 CharStringresult 值的类型为 Enumerator[Option[Item]],您可以将一些迭代器应用于此枚举器以开始解析过程。

总的来说,我不知道你是如何接收字节的(解决方案在很大程度上取决于此),但我认为这显示了你的问题的可能解决方案之一。

我在尝试将 Twitter 流(无限字符串)解析为域对象时遇到了一个非常相似的问题。 我用 Json4s 解决了它,像这样:

case class Tweet(username: String, geolocation: Option[Geo])
case class Geo(latitude: Float, longitude: Float)
object Tweet{
    def apply(s: String): Tweet = {
        parse(StringInput(s), useBigDecimalForDouble = false, useBigIntForLong = false).extract[Tweet]
    }
}

然后我只是缓冲流并将其映射到推文:

val reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(inputStream), "UTF-8"))
var line = reader.readLine()
while(line != null){
    store(Tweet.apply(line))
    line = reader.readLine()
}

Json4s 完全支持 Option(或对象内的自定义对象,如示例中的 Geo)。因此,您可以像我一样放置一个选项,如果该字段没有出现在 Json 中,它将被设置为 None.

希望对您有所帮助!

我想在这种情况下应该使用JsonFraming.objectScanner(Int.MaxValue)。正如文档所述:

Returns a Flow that implements a "brace counting" based framing operator for emitting valid JSON chunks. It scans the incoming data stream for valid JSON objects and returns chunks of ByteStrings containing only those valid chunks. Typical examples of data that one may want to frame using this operator include: Very large arrays

所以你可以得到这样的结果:

val response: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = serviceUrl))

response.onComplete {
  case Success(value) =>
    value.entity.dataBytes
      .via(JsonFraming.objectScanner(Int.MaxValue))
      .map(_.utf8String)         // In case you have ByteString
      .map(decode[MyEntity](_))  // Use any Unmarshaller here
      .grouped(20)
      .runWith(Sink.ignore)      // Do whatever you need here 
  case Failure(exception) => log.error(exception, "Api call failed")
}