使用 akka http 分块响应流式传输 JSON 个对象
Streaming JSON objects using akka http chunked response
像这样使用 akka-http 是滥用还是有某种危险?
在服务器上
def source(consumerOffset: UUID) =
readJournal.eventsByTag(“MyTag", consumerOffset).map(_.asJson)
pathPrefix("stream" / Segment.map(UUID.fromString)) { offset =>
pathEndOrSingleSlash {
get {
complete {
HttpResponse(
StatusCodes.OK,
entity = HttpEntity(ContentTypes.`application/json`, source(offset))
)
}
}
}
}
然后在客户端
Source.single(HttpRequest("http://localhost:9000/stream"))
.mapAsync(1) { r =>
Http().singleRequest(r).map { res =>
res.entity.dataBytes.map(_.parse[Event])
}
}
.flatMapConcat(identity).mapAsync(processEvent)
UPD:
- 是否保证我发送的块在客户端是相同的。
- 是否可以使用可能无穷无尽的块进行响应?
- 这种响应的正确 Content-Type 是什么?
更新 2:
Akka 2.4.9 添加了使用流进行响应的能力。基本上做的完全一样,提供一些语法糖。
见 docs.
按顺序回答您的问题:
- 是的,客户端将收到
BytesString
以 json 编码的对象表示。
- 是的,在服务器端有一个永不终止的流源很好。
- 您在问题(
application/json
)中指定的内容是正确的。
像这样使用 akka-http 是滥用还是有某种危险?
在服务器上
def source(consumerOffset: UUID) =
readJournal.eventsByTag(“MyTag", consumerOffset).map(_.asJson)
pathPrefix("stream" / Segment.map(UUID.fromString)) { offset =>
pathEndOrSingleSlash {
get {
complete {
HttpResponse(
StatusCodes.OK,
entity = HttpEntity(ContentTypes.`application/json`, source(offset))
)
}
}
}
}
然后在客户端
Source.single(HttpRequest("http://localhost:9000/stream"))
.mapAsync(1) { r =>
Http().singleRequest(r).map { res =>
res.entity.dataBytes.map(_.parse[Event])
}
}
.flatMapConcat(identity).mapAsync(processEvent)
UPD:
- 是否保证我发送的块在客户端是相同的。
- 是否可以使用可能无穷无尽的块进行响应?
- 这种响应的正确 Content-Type 是什么?
更新 2:
Akka 2.4.9 添加了使用流进行响应的能力。基本上做的完全一样,提供一些语法糖。 见 docs.
按顺序回答您的问题:
- 是的,客户端将收到
BytesString
以 json 编码的对象表示。 - 是的,在服务器端有一个永不终止的流源很好。
- 您在问题(
application/json
)中指定的内容是正确的。