akka-http 分块响应连接
akka-http chunked response concatenation
我正在使用 akka-http
向发回分块响应的 http 服务发出请求。这是相关代码的样子:
val httpRequest: HttpRequest = //build the request
val request = Http().singleRequest(httpRequest)
request.flatMap { response =>
response.entity.dataBytes.runForeach { chunk =>
println("-----")
println(chunk.utf8String)
}
}
并且在命令行中产生的输出看起来像这样:
-----
{"data":
-----
"some text"}
-----
{"data":
-----
"this is a longer
-----
text"}
-----
{"data": "txt"}
-----
...
数据的逻辑部分 - 在这种情况下 json 以行尾符号 \r\n
结尾,但问题是 json 并不总是适合在单个 http 响应块中,如上例所示。
我的问题是 - 如何将传入的分块数据连接成完整的 json,以便生成的容器类型仍保持 Source[Out,M1]
或 Flow[In,Out,M2]
?我想遵循 akka-stream
.
的理念
更新:还值得一提的是,响应是无穷无尽的,聚合必须实时完成
找到解决方案:
val request: HttpRequest = //build the request
request.flatMap { response =>
response.entity.dataBytes.scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String)
.filter(_.contains("\r\n"))
.runForeach { json =>
println("-----")
println(json)
}
}
akka stream documentation 在食谱中有一个条目来解决这个问题:"Parsing lines from a stream of ByteString"。他们的解决方案非常冗长,但也可以处理单个块可以包含多行的情况。这似乎更可靠,因为块大小可以更改为足够大以处理多个 json 消息。
response.entity.dataBytes
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 8096))
.mapAsyncUnordered(Runtime.getRuntime.availableProcessors()) { data =>
if (response.status == OK) {
val event: Future[Event] = Unmarshal(data).to[Event]
event.foreach(x => log.debug("Received event: {}.", x))
event.map(Right(_))
} else {
Future.successful(data.utf8String)
.map(Left(_))
}
}
唯一的要求是您知道一条记录的最大大小。如果您从小的开始,默认行为是在记录大于限制时失败。您可以将其设置为截断而不是失败,但是 JSON 的一部分没有任何意义。
我正在使用 akka-http
向发回分块响应的 http 服务发出请求。这是相关代码的样子:
val httpRequest: HttpRequest = //build the request
val request = Http().singleRequest(httpRequest)
request.flatMap { response =>
response.entity.dataBytes.runForeach { chunk =>
println("-----")
println(chunk.utf8String)
}
}
并且在命令行中产生的输出看起来像这样:
-----
{"data":
-----
"some text"}
-----
{"data":
-----
"this is a longer
-----
text"}
-----
{"data": "txt"}
-----
...
数据的逻辑部分 - 在这种情况下 json 以行尾符号 \r\n
结尾,但问题是 json 并不总是适合在单个 http 响应块中,如上例所示。
我的问题是 - 如何将传入的分块数据连接成完整的 json,以便生成的容器类型仍保持 Source[Out,M1]
或 Flow[In,Out,M2]
?我想遵循 akka-stream
.
更新:还值得一提的是,响应是无穷无尽的,聚合必须实时完成
找到解决方案:
val request: HttpRequest = //build the request
request.flatMap { response =>
response.entity.dataBytes.scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String)
.filter(_.contains("\r\n"))
.runForeach { json =>
println("-----")
println(json)
}
}
akka stream documentation 在食谱中有一个条目来解决这个问题:"Parsing lines from a stream of ByteString"。他们的解决方案非常冗长,但也可以处理单个块可以包含多行的情况。这似乎更可靠,因为块大小可以更改为足够大以处理多个 json 消息。
response.entity.dataBytes
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 8096))
.mapAsyncUnordered(Runtime.getRuntime.availableProcessors()) { data =>
if (response.status == OK) {
val event: Future[Event] = Unmarshal(data).to[Event]
event.foreach(x => log.debug("Received event: {}.", x))
event.map(Right(_))
} else {
Future.successful(data.utf8String)
.map(Left(_))
}
}
唯一的要求是您知道一条记录的最大大小。如果您从小的开始,默认行为是在记录大于限制时失败。您可以将其设置为截断而不是失败,但是 JSON 的一部分没有任何意义。