使用分块传输编码从 scala Play 服务器流式传输案例 class 个对象
Streaming case class objects from a scala Play server using chunked transfer encoding
所以,我正在使用 Play framework 2.7 来设置流媒体服务器。我想要做的是流式传输大约 500 个大小相似的自定义案例 class 对象。
这是生成流的控制器的一部分 -
def generate: Action[AnyContent] = Action {
val products = (1 to 500).map(Product(_, "some random string")).toList
Ok.chunked[Product](Source(products))
}
其中 Product
是我正在使用的自定义案例 class。隐式 writable
将此对象反序列化为 json.
这是处理此流的控制器的一部分 -
def process(): Action[AnyContent] = Action.async {
val request = ws.url(STREAMING_URL).withRequestTimeout(Duration.Inf).withMethod("GET")
request.stream().flatMap {
_.bodyAsSource
.map(_.utf8String)
.map { x => println(x); x }
.fold(0) { (acc, _) => acc + 1 }
.runWith(Sink.last)
.andThen {
case Success(v) => println(s"Total count - $v")
case Failure(_) => println("Error encountered")
}
}.map(_ => Ok)
}
我期望的是,我的案例 class 中的每个对象都作为一个单独的块传输并同样接收,以便它们可以单独序列化并由接收方使用。这意味着,使用上面的代码,我的期望是我应该收到恰好 500 个块,但这个值总是比那个多。
我看到的是,这500个对象中恰好有一个对象被拆分传输,并以2个块而不是1个块传输。
这是一个正常的对象,如接收方所见-
{
"id" : 494,
"name" : "some random string"
}
这是一个一分为二的对象 -
{
"id" : 463,
"name" : "some random strin
g"
}
因此,无法将其序列化回我的 Product
案例 class.
的实例
但是,如果我在发送者控制器中对源进行某种限制,我会按预期收到块。
例如,这在我每秒仅传输 5 个元素的情况下完全正常 -
def generate: Action[AnyContent] = Action {
val products = (1 to 500).map(Product(_, "some random string")).toList
Ok.chunked[Product](Source(products).throttle(5, 1.second))
}
谁能帮我理解为什么会这样?
如 here 所述,有一个 JsonFraming
可以将有效的 JSON 对象与传入的 ByteString
流分开。
你的情况可以这样试试
_.bodyAsSource.via(JsonFraming.objectScanner(Int.MaxValue)).map(_.utf8String)
所以,我正在使用 Play framework 2.7 来设置流媒体服务器。我想要做的是流式传输大约 500 个大小相似的自定义案例 class 对象。
这是生成流的控制器的一部分 -
def generate: Action[AnyContent] = Action {
val products = (1 to 500).map(Product(_, "some random string")).toList
Ok.chunked[Product](Source(products))
}
其中 Product
是我正在使用的自定义案例 class。隐式 writable
将此对象反序列化为 json.
这是处理此流的控制器的一部分 -
def process(): Action[AnyContent] = Action.async {
val request = ws.url(STREAMING_URL).withRequestTimeout(Duration.Inf).withMethod("GET")
request.stream().flatMap {
_.bodyAsSource
.map(_.utf8String)
.map { x => println(x); x }
.fold(0) { (acc, _) => acc + 1 }
.runWith(Sink.last)
.andThen {
case Success(v) => println(s"Total count - $v")
case Failure(_) => println("Error encountered")
}
}.map(_ => Ok)
}
我期望的是,我的案例 class 中的每个对象都作为一个单独的块传输并同样接收,以便它们可以单独序列化并由接收方使用。这意味着,使用上面的代码,我的期望是我应该收到恰好 500 个块,但这个值总是比那个多。
我看到的是,这500个对象中恰好有一个对象被拆分传输,并以2个块而不是1个块传输。
这是一个正常的对象,如接收方所见-
{
"id" : 494,
"name" : "some random string"
}
这是一个一分为二的对象 -
{
"id" : 463,
"name" : "some random strin
g"
}
因此,无法将其序列化回我的 Product
案例 class.
但是,如果我在发送者控制器中对源进行某种限制,我会按预期收到块。
例如,这在我每秒仅传输 5 个元素的情况下完全正常 -
def generate: Action[AnyContent] = Action {
val products = (1 to 500).map(Product(_, "some random string")).toList
Ok.chunked[Product](Source(products).throttle(5, 1.second))
}
谁能帮我理解为什么会这样?
如 here 所述,有一个 JsonFraming
可以将有效的 JSON 对象与传入的 ByteString
流分开。
你的情况可以这样试试
_.bodyAsSource.via(JsonFraming.objectScanner(Int.MaxValue)).map(_.utf8String)