如何将 Reactor Flux<String> 转换为 InputStream
How to convert Reactor Flux<String> to InputStream
鉴于我有一个未知大小的 Flux<String>
,我怎样才能将它转换成其他库期望的 InputStream
?
例如,对于 WebClient,我可以使用这种方法实现
WebClient.get('example.com').exchange.flatMap { it.bodyToMono(InputStreamResource::class.java) }.map { it.inputStream }
但是当我有 Flux<String>
作为输入时,我不知道如何做同样的事情?
您可以将已知大小的 Flux<String>
转换为 Mono<byte[]>
进而可以使用组成一个InputStream
。检查一下(在 Java 中):
Flux<String> stringFlux = ...;
stringFlux.collect(() -> new ByteArrayOutputStream(),
(baos, str) -> {
try {
baos.write(str.getBytes());
} catch (IOException e) {
// do nothing
}
})
.map(baos -> new ByteArrayInputStream(baos.toByteArray()))
.map(inputStream -> ... // call other library);
这需要冷静 Flux<T>
,因为 collect()
将在 Flux
完成后成为 运行。对于未知大小的 Flux<T>
(假设每个 String
都是一个独立的对象),它变得更简单:
Flux<String> stringFlux = ...;
stringFlux.map(str -> new ByteArrayInputStream(str.getBytes()))
.map(inputStream -> ... // call other library);
可能有很多方法可以做到这一点。一种可能性是使用 PipedInputStream and PipedOutputStream.
它的工作方式是你 link 一个输出流到一个输入流,这样你写到输出流的所有东西都可以从 linked 输入流中读取,通过这样做, 在他们两个之间创建一个管道。
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = PipedOutputStream(in);
但有一个警告,根据管道流的文档,写入过程和读取过程必须发生在不同的线程上,否则我们可能会导致死锁。
所以,回到我们的反应流场景,我们可以创建一个管道(如上所述)并订阅 Flux
对象,并将您从中获得的数据写入管道输出流.无论你在那里写什么,都可以在管道的另一端读取,在相应的输入流中。此输入流是您可以与非反应性方法共享的输入流。
我们只需要格外小心,在单独的线程上订阅 Flux,例如subscribeOn(Schedulers.elastic())
.
这是此类订阅者的一个非常基本的实现:
class PipedStreamSubscriber extends BaseSubscriber<byte[]> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final PipedInputStream in;
private PipedOutputStream out;
PipedStreamSubscriber(PipedInputStream in) {
Objects.requireNonNull(in, "The input stream must not be null");
this.in = in;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
//change if you want to control back-pressure
super.hookOnSubscribe(subscription);
try {
this.out = new PipedOutputStream(in);
} catch (IOException e) {
//TODO throw a contextual exception here
throw new RuntimeException(e);
}
}
@Override
protected void hookOnNext(byte[] payload) {
try {
out.write(payload);
} catch (IOException e) {
//TODO throw a contextual exception here
throw new RuntimeException(e);
}
}
@Override
protected void hookOnComplete() {
close();
}
@Override
protected void hookOnError(Throwable error) {
//TODO handle the error or at least log it
logger.error("Failure processing stream", error);
close();
}
@Override
protected void hookOnCancel() {
close();
}
private void close() {
try {
if (out != null) {
out.close();
}
} catch (IOException e) {
//probably just ignore this one or simply log it
}
}
}
使用这个订阅者,我可以定义一个非常简单的实用方法,将 Flux<byte[]
变成 InputStream
,如下所示:
static InputStream createInputStream(Flux<byte[]> flux) {
PipedInputStream in = new PipedInputStream();
flux.subscribeOn(Schedulers.elastic())
.subscribe(new PipedStreamSubscriber(in));
return in;
}
请注意,当流程完成、发生错误或取消订阅时,我特别小心地关闭了输出流,否则我们 运行 有在读取端阻塞的风险,等待更多输入到达。关闭输出流是管道另一侧输入流结束的信号。
现在 InputStream 可以像任何常规流一样使用,因此您可以将它传递给非反应性方法,例如
Flux<byte[]> jedi = Flux.just("Luke\n", "Obi-Wan\n", "Yoda\n").map(String::getBytes);
try (InputStream in = createInputStream(jedi)) {
byte[] data = new byte[5];
int size = 0;
while ((size = in.read(data)) > 0) {
System.out.printf("%s", new String(data, 0, size));
}
}
上面的代码产生:
Luke
Obi-Wan
Yoda
您可以将Flux<DataBuffer>
减少到Mono<DataBuffer>
,然后转移到InputStream
。
在WebFlux中上传文件到GridFs的示例代码:
private GridFsTemplate gridFsTemplate;
public Mono<String> storeFile(FilePart filePart) {
HttpHeaders headers = filePart.headers();
String contentType = Objects.requireNonNull(headers.getContentType()).toString();
return filePart.content()
.reduce(DataBuffer::write).map(DataBuffer::asInputStream)
.map(input -> gridFsTemplate.store(input, filePart.filename(), contentType))
.map(ObjectId::toHexString);
}
Edwin 的回答对我没有用,因为上游的错误被订阅者吞没并且没有传播到 InputStream
的消费者。尽管如此,受埃德温回答的启发,我找到了不同的解决方案。这是一个使用 Flux<ByteArray>
并将其作为 InputStream
下游传递的示例。该示例包括解密以突出显示即使在 Flux<ByteStream>
完全消耗后仍可能操纵 OutputStream
的可能性,最终产生一个错误并传播到下游。
fun decryptAndGetInputStream(flux: Flux<ByteArray>, cipher: Cipher): Flux<InputStream> {
val inputStream = PipedInputStream()
val outputStream = PipedOutputStream(inputStream)
val isStreamEmitted = AtomicBoolean(false)
return flux.handle<InputStream> { byteArray, sink ->
try {
outputStream.write(cipher.update(byteArray))
// emit the input stream as soon as we get the first chunk of bytes
// make sure we do it only once
if (!isStreamEmitted.getAndSet(true)) {
sink.next(inputStream)
}
} catch (e: Exception) {
// catch all errors to pass them to the sink
sink.error(e)
}
}.doOnComplete {
// here we have a last chance to throw an error
outputStream.write(cipher.doFinal())
}.doOnTerminate {
// error thrown here won't get propagated downstream
// since this callback is triggered after flux's completion
outputStream.flush()
outputStream.close()
}
}
这里的问题是使用 handle
运算符生成一个 Flux
最多发出一个项目。与 Mono
不同,Flux
不会在第一次发射后立即终止。虽然它不会再发射任何项目,但它会保持“打开”状态以发射在第一次发射后发生的最终错误。
下面是一个使用 Flux<InputStream>
并将其转换为 Mono
的示例。
fun decryptAndGetProcessingResult(flux: Flux<ByteArray>, cipher: Cipher): Mono<Result> =
decryptAndGetInputStream(flux, cipher)
// the following operator gets called at most once
.flatMap { inputStream ->
// wrap the blocking operation into mono
// subscribed on another thread to avoid deadlocks
Mono.fromCallable {
processInputStream(inputStream)
}.subscribeOn(Schedulers.elastic())
// to get mono out of flux we implement reduce operator
// although it gets never called
}.reduce { t, _ -> t }
这里的另一个优点是使用 InputStream 的线程在第一个数据块可用之前不会阻塞。
鉴于我有一个未知大小的 Flux<String>
,我怎样才能将它转换成其他库期望的 InputStream
?
例如,对于 WebClient,我可以使用这种方法实现
WebClient.get('example.com').exchange.flatMap { it.bodyToMono(InputStreamResource::class.java) }.map { it.inputStream }
但是当我有 Flux<String>
作为输入时,我不知道如何做同样的事情?
您可以将已知大小的 Flux<String>
转换为 Mono<byte[]>
进而可以使用组成一个InputStream
。检查一下(在 Java 中):
Flux<String> stringFlux = ...;
stringFlux.collect(() -> new ByteArrayOutputStream(),
(baos, str) -> {
try {
baos.write(str.getBytes());
} catch (IOException e) {
// do nothing
}
})
.map(baos -> new ByteArrayInputStream(baos.toByteArray()))
.map(inputStream -> ... // call other library);
这需要冷静 Flux<T>
,因为 collect()
将在 Flux
完成后成为 运行。对于未知大小的 Flux<T>
(假设每个 String
都是一个独立的对象),它变得更简单:
Flux<String> stringFlux = ...;
stringFlux.map(str -> new ByteArrayInputStream(str.getBytes()))
.map(inputStream -> ... // call other library);
可能有很多方法可以做到这一点。一种可能性是使用 PipedInputStream and PipedOutputStream.
它的工作方式是你 link 一个输出流到一个输入流,这样你写到输出流的所有东西都可以从 linked 输入流中读取,通过这样做, 在他们两个之间创建一个管道。
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = PipedOutputStream(in);
但有一个警告,根据管道流的文档,写入过程和读取过程必须发生在不同的线程上,否则我们可能会导致死锁。
所以,回到我们的反应流场景,我们可以创建一个管道(如上所述)并订阅 Flux
对象,并将您从中获得的数据写入管道输出流.无论你在那里写什么,都可以在管道的另一端读取,在相应的输入流中。此输入流是您可以与非反应性方法共享的输入流。
我们只需要格外小心,在单独的线程上订阅 Flux,例如subscribeOn(Schedulers.elastic())
.
这是此类订阅者的一个非常基本的实现:
class PipedStreamSubscriber extends BaseSubscriber<byte[]> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final PipedInputStream in;
private PipedOutputStream out;
PipedStreamSubscriber(PipedInputStream in) {
Objects.requireNonNull(in, "The input stream must not be null");
this.in = in;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
//change if you want to control back-pressure
super.hookOnSubscribe(subscription);
try {
this.out = new PipedOutputStream(in);
} catch (IOException e) {
//TODO throw a contextual exception here
throw new RuntimeException(e);
}
}
@Override
protected void hookOnNext(byte[] payload) {
try {
out.write(payload);
} catch (IOException e) {
//TODO throw a contextual exception here
throw new RuntimeException(e);
}
}
@Override
protected void hookOnComplete() {
close();
}
@Override
protected void hookOnError(Throwable error) {
//TODO handle the error or at least log it
logger.error("Failure processing stream", error);
close();
}
@Override
protected void hookOnCancel() {
close();
}
private void close() {
try {
if (out != null) {
out.close();
}
} catch (IOException e) {
//probably just ignore this one or simply log it
}
}
}
使用这个订阅者,我可以定义一个非常简单的实用方法,将 Flux<byte[]
变成 InputStream
,如下所示:
static InputStream createInputStream(Flux<byte[]> flux) {
PipedInputStream in = new PipedInputStream();
flux.subscribeOn(Schedulers.elastic())
.subscribe(new PipedStreamSubscriber(in));
return in;
}
请注意,当流程完成、发生错误或取消订阅时,我特别小心地关闭了输出流,否则我们 运行 有在读取端阻塞的风险,等待更多输入到达。关闭输出流是管道另一侧输入流结束的信号。
现在 InputStream 可以像任何常规流一样使用,因此您可以将它传递给非反应性方法,例如
Flux<byte[]> jedi = Flux.just("Luke\n", "Obi-Wan\n", "Yoda\n").map(String::getBytes);
try (InputStream in = createInputStream(jedi)) {
byte[] data = new byte[5];
int size = 0;
while ((size = in.read(data)) > 0) {
System.out.printf("%s", new String(data, 0, size));
}
}
上面的代码产生:
Luke
Obi-Wan
Yoda
您可以将Flux<DataBuffer>
减少到Mono<DataBuffer>
,然后转移到InputStream
。
在WebFlux中上传文件到GridFs的示例代码:
private GridFsTemplate gridFsTemplate;
public Mono<String> storeFile(FilePart filePart) {
HttpHeaders headers = filePart.headers();
String contentType = Objects.requireNonNull(headers.getContentType()).toString();
return filePart.content()
.reduce(DataBuffer::write).map(DataBuffer::asInputStream)
.map(input -> gridFsTemplate.store(input, filePart.filename(), contentType))
.map(ObjectId::toHexString);
}
Edwin 的回答对我没有用,因为上游的错误被订阅者吞没并且没有传播到 InputStream
的消费者。尽管如此,受埃德温回答的启发,我找到了不同的解决方案。这是一个使用 Flux<ByteArray>
并将其作为 InputStream
下游传递的示例。该示例包括解密以突出显示即使在 Flux<ByteStream>
完全消耗后仍可能操纵 OutputStream
的可能性,最终产生一个错误并传播到下游。
fun decryptAndGetInputStream(flux: Flux<ByteArray>, cipher: Cipher): Flux<InputStream> {
val inputStream = PipedInputStream()
val outputStream = PipedOutputStream(inputStream)
val isStreamEmitted = AtomicBoolean(false)
return flux.handle<InputStream> { byteArray, sink ->
try {
outputStream.write(cipher.update(byteArray))
// emit the input stream as soon as we get the first chunk of bytes
// make sure we do it only once
if (!isStreamEmitted.getAndSet(true)) {
sink.next(inputStream)
}
} catch (e: Exception) {
// catch all errors to pass them to the sink
sink.error(e)
}
}.doOnComplete {
// here we have a last chance to throw an error
outputStream.write(cipher.doFinal())
}.doOnTerminate {
// error thrown here won't get propagated downstream
// since this callback is triggered after flux's completion
outputStream.flush()
outputStream.close()
}
}
这里的问题是使用 handle
运算符生成一个 Flux
最多发出一个项目。与 Mono
不同,Flux
不会在第一次发射后立即终止。虽然它不会再发射任何项目,但它会保持“打开”状态以发射在第一次发射后发生的最终错误。
下面是一个使用 Flux<InputStream>
并将其转换为 Mono
的示例。
fun decryptAndGetProcessingResult(flux: Flux<ByteArray>, cipher: Cipher): Mono<Result> =
decryptAndGetInputStream(flux, cipher)
// the following operator gets called at most once
.flatMap { inputStream ->
// wrap the blocking operation into mono
// subscribed on another thread to avoid deadlocks
Mono.fromCallable {
processInputStream(inputStream)
}.subscribeOn(Schedulers.elastic())
// to get mono out of flux we implement reduce operator
// although it gets never called
}.reduce { t, _ -> t }
这里的另一个优点是使用 InputStream 的线程在第一个数据块可用之前不会阻塞。