使用分块传输编码从 scala Play 服务器流式传输案例 class 个对象

Streaming case class objects from a scala Play server using chunked transfer encoding

所以,我正在使用 Play framework 2.7 来设置流媒体服务器。我想要做的是流式传输大约 500 个大小相似的自定义案例 class 对象。

这是生成流的控制器的一部分 -

def generate: Action[AnyContent] = Action {
    val products = (1 to 500).map(Product(_, "some random string")).toList
    Ok.chunked[Product](Source(products))
  }

其中 Product 是我正在使用的自定义案例 class。隐式 writable 将此对象反序列化为 json.

这是处理此流的控制器的一部分 -

def process(): Action[AnyContent] = Action.async {
    val request = ws.url(STREAMING_URL).withRequestTimeout(Duration.Inf).withMethod("GET")
    request.stream().flatMap {
      _.bodyAsSource
        .map(_.utf8String)
        .map { x => println(x); x }
        .fold(0) { (acc, _) => acc + 1 }
        .runWith(Sink.last)
        .andThen {
          case Success(v) => println(s"Total count - $v")
          case Failure(_) => println("Error encountered")
        }
    }.map(_ => Ok)
  }

我期望的是,我的案例 class 中的每个对象都作为一个单独的块传输并同样接收,以便它们可以单独序列化并由接收方使用。这意味着,使用上面的代码,我的期望是我应该收到恰好 500 个块,但这个值总是比那个多。

我看到的是,这500个对象中恰好有一个对象被拆分传输,并以2个块而不是1个块传输。

这是一个正常的对象,如接收方所见-

{
  "id" : 494,
  "name" : "some random string"
}

这是一个一分为二的对象 -

{
  "id" : 463,
  "name" : "some random strin
g"
}

因此,无法将其序列化回我的 Product 案例 class.

的实例

但是,如果我在发送者控制器中对源进行某种限制,我会按预期收到块。

例如,这在我每秒仅传输 5 个元素的情况下完全正常 -

def generate: Action[AnyContent] = Action {
    val products = (1 to 500).map(Product(_, "some random string")).toList
    Ok.chunked[Product](Source(products).throttle(5, 1.second))
  }

谁能帮我理解为什么会这样?

here 所述,有一个 JsonFraming 可以将有效的 JSON 对象与传入的 ByteString 流分开。

你的情况可以这样试试

  _.bodyAsSource.via(JsonFraming.objectScanner(Int.MaxValue)).map(_.utf8String)