通过 Akka/Spring 流从 InputStream 发送数据
Send data from InputStream over Akka/Spring stream
这是我之前的问题:
我已经成功地使用 FileIO.fromPath(Paths.get(file.toURI())) 通过 Akka 流发送文件并且它工作正常。但是,我想在发送文件之前对其进行压缩和加密。我创建了方法,它打开 FileInputStream,通过压缩流路由它,然后通过加密流路由它,现在我想使用 Akka 流将它定向到套接字中。
文件 -> FileInputStream -> CompressedInputStream -> EncryptedInputStream -?> Spring/Akka 流
重点是,我可以 compress/encrypt 在逐块读取文件的同时即时文件(我没有在磁盘上创建额外的文件),而且我不知道如何发送 InputStream (压缩和加密)通过 Akka/Spring 流(通过 Spring 流,我的意思是 Reactor API 项目下的 Akka 流)。
问题是:如何在不将整个 compressed/encrypted 文件随时保存到磁盘的情况下压缩、加密和发送文件?
实际上,有一个专门用于处理输入流等资源的源。它被称为 Source.unfoldResource
:
Source<ByteString, NotUsed> source = Source.unfoldResource(
() -> prepareEncryptedStream(),
is -> readChunk(is, 4096),
InputStream::close
);
Optional<ByteString> readChunk(InputStream is, int size) throws IOException {
byte[] data = new byte[size];
int read = is.read(data);
if (read < 0) {
return Optional.empty();
}
return Optional.of(ByteString.fromArray(data, 0, read));
}
InputStream prepareEncryptedStream() { ... }
这里 prepareCompressedFile()
是一种方法,它应该 return 您要从中创建反应流的加密流,readChunk()
是一种读取 ByteString
来自指定大小的 InputStream
。
如果您可以将压缩和加密例程表达为 ByteString -> ByteString
函数,那么您真的不需要这个;您需要做的就是将这些例程传递给 map()
流程:
Source<ByteString, CompletionStage<IOResult>> source =
FileIO.fromPath(Paths.get("somewhere"))
.map(bs -> compress(bs))
.map(bs -> encrypt(bs));
ByteString encrypt(ByteString bs) { ... }
ByteString compress(ByteString bs) { ... }
我(无意中)发现:StreamConverters.fromInputStream!
http://doc.akka.io/docs/akka/2.4.17/java/stream/stages-overview.html#fromInputStream
这是我之前的问题:
我已经成功地使用 FileIO.fromPath(Paths.get(file.toURI())) 通过 Akka 流发送文件并且它工作正常。但是,我想在发送文件之前对其进行压缩和加密。我创建了方法,它打开 FileInputStream,通过压缩流路由它,然后通过加密流路由它,现在我想使用 Akka 流将它定向到套接字中。
文件 -> FileInputStream -> CompressedInputStream -> EncryptedInputStream -?> Spring/Akka 流
重点是,我可以 compress/encrypt 在逐块读取文件的同时即时文件(我没有在磁盘上创建额外的文件),而且我不知道如何发送 InputStream (压缩和加密)通过 Akka/Spring 流(通过 Spring 流,我的意思是 Reactor API 项目下的 Akka 流)。
问题是:如何在不将整个 compressed/encrypted 文件随时保存到磁盘的情况下压缩、加密和发送文件?
实际上,有一个专门用于处理输入流等资源的源。它被称为 Source.unfoldResource
:
Source<ByteString, NotUsed> source = Source.unfoldResource(
() -> prepareEncryptedStream(),
is -> readChunk(is, 4096),
InputStream::close
);
Optional<ByteString> readChunk(InputStream is, int size) throws IOException {
byte[] data = new byte[size];
int read = is.read(data);
if (read < 0) {
return Optional.empty();
}
return Optional.of(ByteString.fromArray(data, 0, read));
}
InputStream prepareEncryptedStream() { ... }
这里 prepareCompressedFile()
是一种方法,它应该 return 您要从中创建反应流的加密流,readChunk()
是一种读取 ByteString
来自指定大小的 InputStream
。
如果您可以将压缩和加密例程表达为 ByteString -> ByteString
函数,那么您真的不需要这个;您需要做的就是将这些例程传递给 map()
流程:
Source<ByteString, CompletionStage<IOResult>> source =
FileIO.fromPath(Paths.get("somewhere"))
.map(bs -> compress(bs))
.map(bs -> encrypt(bs));
ByteString encrypt(ByteString bs) { ... }
ByteString compress(ByteString bs) { ... }
我(无意中)发现:StreamConverters.fromInputStream!
http://doc.akka.io/docs/akka/2.4.17/java/stream/stages-overview.html#fromInputStream