将 HttpEntity.Chunked 转换为数组 [String]
Convert HttpEntity.Chunked to Array[String]
我有以下问题。
我正在向服务器查询一些数据并将其返回为 HttpEntity.Chunked。
响应字符串看起来像这样,最多 10.000.000 行如下:
[{"name":"param1","value":122343,"time":45435345},
{"name":"param2","value":243,"time":4325435},
......]
现在我想将传入数据放入 Array[String] 中,其中每个 String 都是来自响应的一行,因为稍后应该将其导入到 apache spark 数据帧中。
目前我是这样做的:
//For the http request
trait StartHttpRequest {
implicit val system: ActorSystem
implicit val materializer: ActorMaterializer
def httpRequest(data: String, path: String, targetPort: Int, host: String): Future[HttpResponse] = {
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
Http().outgoingConnection(host, port = targetPort)
}
val responseFuture: Future[HttpResponse] =
Source.single(RequestBuilding.Post(uri = path, entity = HttpEntity(ContentTypes.`application/json`, data)))
.via(connectionFlow)
.runWith(Sink.head)
responseFuture
}
}
//result of the request
val responseFuture: Future[HttpResponse] = httpRequest(.....)
//convert to string
responseFuture.flatMap { response =>
response.status match {
case StatusCodes.OK =>
Unmarshal(response.entity).to[String]
}
}
//and then something like this, but with even more stupid stuff
responseFuture.onSuccess { str:String =>
masterActor! str.split("""\},\{""")
}
我的问题是,将结果放入数组的更好方法是什么?
如何直接解组响应实体?因为 .to[Array[String]] 例如不起作用。而且因为有这么多行要来,我可以用一个流来做,这样会更有效率吗?
乱序回答您的问题:
如何直接解组响应实体?
有一个 与解组 case classes 数组有关。
将结果放入数组的更好方法是什么?
我会利用分块的特性并使用流。这允许您同时进行字符串处理和 json 解析。
首先你需要一个容器class和解析器:
case class Data(name : String, value : Int, time : Long)
object MyJsonProtocol extends DefaultJsonProtocol {
implicit val dataFormat = jsonFormat3(Data)
}
然后你必须做一些操作来让 json 对象看起来正确:
//Drops the '[' and the ']' characters
val dropArrayMarkers =
Flow[ByteString].map(_.filterNot(b => b == '['.toByte || b == ']'.toByte))
val preppendBrace =
Flow[String].map(s => if(!s.startsWith("{")) "{" + s else s)
val appendBrace =
Flow[String].map(s => if(!s.endsWith("}")) s + "}" else s)
val parseJson =
Flow[String].map(_.parseJson.convertTo[Data])
最后,结合这些流将 ByteString 源转换为数据源对象:
def strSourceToDataSource(source : Source[ByteString,_]) : Source[Data, _] =
source.via(dropArrayMarkers)
.via(Framing.delimiter(ByteString("},{"), 256, true))
.map(_.utf8String)
.via(prependBrace)
.via(appendBrace)
.via(parseJson)
然后可以将此源引流到 Seq
个数据对象中:
val dataSeq : Future[Seq[Data]] =
responseFuture flatMap { response =>
response.status match {
case StatusCodes.OK =>
strSourceToDataSource(response.entity.dataBytes).runWith(Sink.seq)
}
}
我有以下问题。 我正在向服务器查询一些数据并将其返回为 HttpEntity.Chunked。 响应字符串看起来像这样,最多 10.000.000 行如下:
[{"name":"param1","value":122343,"time":45435345},
{"name":"param2","value":243,"time":4325435},
......]
现在我想将传入数据放入 Array[String] 中,其中每个 String 都是来自响应的一行,因为稍后应该将其导入到 apache spark 数据帧中。 目前我是这样做的:
//For the http request
trait StartHttpRequest {
implicit val system: ActorSystem
implicit val materializer: ActorMaterializer
def httpRequest(data: String, path: String, targetPort: Int, host: String): Future[HttpResponse] = {
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
Http().outgoingConnection(host, port = targetPort)
}
val responseFuture: Future[HttpResponse] =
Source.single(RequestBuilding.Post(uri = path, entity = HttpEntity(ContentTypes.`application/json`, data)))
.via(connectionFlow)
.runWith(Sink.head)
responseFuture
}
}
//result of the request
val responseFuture: Future[HttpResponse] = httpRequest(.....)
//convert to string
responseFuture.flatMap { response =>
response.status match {
case StatusCodes.OK =>
Unmarshal(response.entity).to[String]
}
}
//and then something like this, but with even more stupid stuff
responseFuture.onSuccess { str:String =>
masterActor! str.split("""\},\{""")
}
我的问题是,将结果放入数组的更好方法是什么? 如何直接解组响应实体?因为 .to[Array[String]] 例如不起作用。而且因为有这么多行要来,我可以用一个流来做,这样会更有效率吗?
乱序回答您的问题:
如何直接解组响应实体?
有一个
将结果放入数组的更好方法是什么?
我会利用分块的特性并使用流。这允许您同时进行字符串处理和 json 解析。
首先你需要一个容器class和解析器:
case class Data(name : String, value : Int, time : Long)
object MyJsonProtocol extends DefaultJsonProtocol {
implicit val dataFormat = jsonFormat3(Data)
}
然后你必须做一些操作来让 json 对象看起来正确:
//Drops the '[' and the ']' characters
val dropArrayMarkers =
Flow[ByteString].map(_.filterNot(b => b == '['.toByte || b == ']'.toByte))
val preppendBrace =
Flow[String].map(s => if(!s.startsWith("{")) "{" + s else s)
val appendBrace =
Flow[String].map(s => if(!s.endsWith("}")) s + "}" else s)
val parseJson =
Flow[String].map(_.parseJson.convertTo[Data])
最后,结合这些流将 ByteString 源转换为数据源对象:
def strSourceToDataSource(source : Source[ByteString,_]) : Source[Data, _] =
source.via(dropArrayMarkers)
.via(Framing.delimiter(ByteString("},{"), 256, true))
.map(_.utf8String)
.via(prependBrace)
.via(appendBrace)
.via(parseJson)
然后可以将此源引流到 Seq
个数据对象中:
val dataSeq : Future[Seq[Data]] =
responseFuture flatMap { response =>
response.status match {
case StatusCodes.OK =>
strSourceToDataSource(response.entity.dataBytes).runWith(Sink.seq)
}
}