升级S3Client到S3AsyncClient如何获取Publisher<ByteBuffer>?

Upgrading S3Client to S3AsyncClient how to get Publisher<ByteBuffer>?

我正在将 aws 云服务中的 S3Client 升级到 S3AsyncClient。

我有这个函数可以转换为异步:

public PutObjectResponse uploadFileByUrl(String fileUrl, String builderId, PbModel category, String categoryId)

    URL url = new URL(fileUrl);
    String[] fileNameArray = url.getFile().split("\.");
    var uniqueFileName = prepareFileName(fileNameArray[fileNameArray.length -1]);

    URLConnection connection = url.openConnection();

    long contentSize = connection.getContentLengthLong();
    InputStream inputStream = connection.getInputStream();

    return s3Client.putObject(myObjectRequestBuild, RequestBody.fromInputStream(inputStream, contentSize));
}

我有这个函数可以转换为异步:

public CompletableFuture<PutObjectResponse> uploadFileByUrl(String fileUrl, String builderId, PbModel category, String categoryId)

    URL url = new URL(fileUrl);
    String[] fileNameArray = url.getFile().split("\.");
    var uniqueFileName = prepareFileName(fileNameArray[fileNameArray.length -1]);

    URLConnection connection = url.openConnection();

    long contentSize = connection.getContentLengthLong();
    InputStream inputStream = connection.getInputStream();

    return asyncClient.putObject(myObjectRequestBuild, AsyncRequestBody.fromPublisher(???));
}
    

正如您在上面的第二种方法中看到的那样,当我将上面的第一个函数转换为异步时,我需要使用 AsyncRequestBody 而不是 RequestBody。 AsyncRequestBody 没有 fromInputStream 方法,但它有我想使用的 fromPublisher 方法,fromPublisher 方法作为 Publisher 的参数类型获取。

所以我的问题是如何将我的 inputStream 转换为 Publisher?

AsyncRequestBody doesn't have fromInputStream method

没错,但是它还有很多其他方法可以创建 AsyncRequestBody:

  1. fromByteBuffer(ByteBuffer byteBuffer)
  2. fromBytes(byte[] bytes)
  3. fromFile(File file)
  4. fromFile(Path path)
  5. fromPublisher(org.reactivestreams.Publisher<ByteBuffer> publisher)
  6. fromString(String string)
  7. fromString(String string, Charset cs)

考虑到以上情况,您有几个解决方案:

  1. 使用IOUtils.toByteArray(inputStream)(或Java 9+,inputStream.readAllBytes())将InputStream转换为字节数组,然后直接使用fromBytes
  2. 如上,但随后使用 ByteBuffer.wrap(byteArray)byte[] 转换为 ByteBuffer,然后使用 fromByteBuffer
  3. 创建一个新的File object specifying a filename, copy the contents of the InputStream to the file's FileOutputStream using IOUtils.copy(),然后使用fromFile(File file)
  4. 如上,但不是提供 File 对象,而是在写入 FileOutputStream
  5. 后提供它到 fromFile(Path path) 的路径
  6. 使用 DataBufferUtils.readByteChannel from the Spring Framework, Akka StreamConverters 等将 InputSteam 转换为 Publisher<ByteArray>,然后使用 fromPublisher
  7. InputStream转换为UTF-8编码String然后使用fromString(String string)(如果是UTF-8编码则无需指定Charset
  8. InputStream 转换为非 UTF-8 编码 String 然后使用 fromString(String string, Charset cs),指定 CharSet

当然,上面的一些内容在你的情况下是多余的,例如fromFile(Path path) 适用于您已经存储的文件,将 InputSteam 转换为 Publisher<ByteArray> 会很痛苦,但为了完整性,我已经包含了所有可能的解决方案。


我会使用解决方案 #1 来解决这个问题,从而产生上述代码中最干净、最简单的代码。

使用 inputStream.readAllBytes()InputStream 转换为 byte[],然后使用 AsyncRequestBody.fromBytes(...).

这应该有效:

public CompletableFuture<PutObjectResponse> uploadFileByUrl(String fileUrl, String builderId, PbModel category, String categoryId)
    URL url = new URL(fileUrl);
    String[] fileNameArray = url.getFile().split("\.");
    var uniqueFileName = prepareFileName(fileNameArray[fileNameArray.length -1]);

    URLConnection connection = url.openConnection();

    long contentSize = connection.getContentLengthLong();
    InputStream inputStream = connection.getInputStream();

    byte[] fileByteArray = inputStream.readAllBytes();

    return asyncClient.putObject(myObjectRequestBuild, AsyncRequestBody.fromBytes(fileByteArray));
}