当内容 (Flux<DataBuffer>) 包装在另一个对象中时,无法使用 WebClient 上传文件

Cannot upload a file using WebClient when its content ( Flux<DataBuffer>) is wrapped in another object

Spring 引导:2.5.6,Kotlin:1.5.31,kotlinx-coroutines-reactive:1.5.2

我正在尝试下载 pdf 并立即将其上传到 http://localhost:3000/upload,如下所示:

Class Mediator(val dataBuffer: Flux<DataBuffer>, val url: String)

runBlocking {
  getMediator()
    .flatMap { uploadFile(it) }
    .subscribe()
}

  private fun getMediator(): Mono<Mediator> {
    return WebClient.create(
            "https://server.com/assets/file.pdf")
        .get()
        .exchangeToMono { response ->
          Mono.just(
              Mediator(response.bodyToFlux(DataBuffer::class.java), "http://localhost:3000/upload"))
        }
  }
  private fun uploadFile(mediator: Mediator): Mono<ResponseEntity<Void>> {
    return WebClient.create(mediator.url)
        .put()
        .contentType(MediaType.APPLICATION_OCTET_STREAM)
        .body(mediator.dataBuffer)
        .retrieve()
        .toBodilessEntity()
  }

当我 运行 这段代码时,我可以看到我的服务器收到一个调用并创建一个空文件(0 大小)。不会引发错误。我无法解释为什么此代码不将字节传输到我的上传端点。在测试时,我意识到问题可能出在 Mediator 返回的 getMediator 对象中。 uploadFile 中的 body 不喜欢包装的 Flux<DataBuffer>

我找到了一种方法来做到这一点。只是回答我自己的问题,以防这可能对其他人有所帮助。

data class Mediator(val data: Flux<DataBuffer>, val url: String)
data class UploadUrl(val url: String)

runBlocking {
  getFileDataBuffer(url)
     .flatMap { prepareUpload(it.headers.contentLength, it.body) }
     .flatMap { uploadFile(it.url, it.dataBuffer!!)}
}.subscribe()

private fun getFileDataBuffer(url:String): Mono<ResponseEntity<Flux<DataBuffer>>> {
  return WebClient.create(url).get().retrieve().toEntityFlux()
}

private fun prepareUpload(length: Long, data: Flux<DataBuffer>?): Mono<Mediator> {
    return WebClient.create("http://localhost:3000/prepare")
        .post()
        .contentType(MediaType.APPLICATION_JSON)
        .bodyValue("""{"size":${length}}""")
        .exchangeToMono { res ->
          res.bodyToMono(String::class.java).map { parseJson<UploadUrl>(it) }.map {
            Mediator(data, it.url)
          }
        }
  }

private fun uploadFile(url: String, data: Flux<DataBuffer>): Mono<ResponseEntity<Void>> {
    return WebClient.create(url)
        .put()
        .contentType(MediaType.APPLICATION_OCTET_STREAM)
        .body(data)
        .retrieve()
        .toBodilessEntity()
  }