通过 Akka/Spring 流从 InputStream 发送数据

Send data from InputStream over Akka/Spring stream

这是我之前的问题:

我已经成功地使用 FileIO.fromPath(Paths.get(file.toURI())) 通过 Akka 流发送文件并且它工作正常。但是,我想在发送文件之前对其进行压缩和加密。我创建了方法,它打开 FileInputStream,通过压缩流路由它,然后通过加密流路由它,现在我想使用 Akka 流将它定向到套接字中。

文件 -> FileInputStream -> CompressedInputStream -> EncryptedInputStream -?> Spring/Akka 流

重点是,我可以 compress/encrypt 在逐块读取文件的同时即时文件(我没有在磁盘上创建额外的文件),而且我不知道如何发送 InputStream (压缩和加密)通过 Akka/Spring 流(通过 Spring 流,我的意思是 Reactor API 项目下的 Akka 流)。

问题是:如何在不将整个 compressed/encrypted 文件随时保存到磁盘的情况下压缩、加密和发送文件?

实际上,有一个专门用于处理输入流等资源的源。它被称为 Source.unfoldResource:

Source<ByteString, NotUsed> source = Source.unfoldResource(
    () -> prepareEncryptedStream(),
    is -> readChunk(is, 4096),
    InputStream::close
);

Optional<ByteString> readChunk(InputStream is, int size) throws IOException {
    byte[] data = new byte[size];
    int read = is.read(data);
    if (read < 0) {
        return Optional.empty();
    }
    return Optional.of(ByteString.fromArray(data, 0, read));
}

InputStream prepareEncryptedStream() { ... }

这里 prepareCompressedFile() 是一种方法,它应该 return 您要从中创建反应流的加密流,readChunk() 是一种读取 ByteString 来自指定大小的 InputStream

如果您可以将压缩和加密例程表达为 ByteString -> ByteString 函数,那么您真的不需要这个;您需要做的就是将这些例程传递给 map() 流程:

Source<ByteString, CompletionStage<IOResult>> source =
    FileIO.fromPath(Paths.get("somewhere"))
        .map(bs -> compress(bs))
        .map(bs -> encrypt(bs));

ByteString encrypt(ByteString bs) { ... }

ByteString compress(ByteString bs) { ... }

我(无意中)发现:StreamConverters.fromInputStream!

http://doc.akka.io/docs/akka/2.4.17/java/stream/stages-overview.html#fromInputStream