将对 OutputStream 的写入转换为 ServerResponse 可用的 Flux<DataBuffer>
Convert writes to OutputStream into a Flux<DataBuffer> usable by ServerResponse
我有一个遗留库,我必须用它来检索文件。这个遗留库在 InputStream 中没有 return,正如您通常期望的那样用于读取内容,但它期望它传递一个开放的 OutputStream,它可以写入。
我必须编写 Webflux REST 服务,将此 OutputStream 写入 org.springframework.web.reactive.function.server.ServerResponse 正文。
legacyLib.BlobRead(outputStream); // writes the stream to an outputstream, that has to be provided by me, and somehow has to end up in the ServerResponse
因为我想将 Stream 直接传递给 ServerResponse,我可能必须做这样的事情,对吗?
ServerResponse.ok().body(magicOutpuStreamToFluxConverter(), DataBuffer.class);
这是 RequestHandler 的重要部分。我遗漏了一些 errorhandling/catching 例外情况,通常可能不需要。请注意,我 publishedOn
一个不同的 Scheduler
用于读取(或者至少,这就是我想要做的),以便此阻塞读取不会干扰我的主事件线程:
private Mono<ServerResponse> writeToServerResponse(@NotNull FPTag tag) {
final long blobSize = tag.getBlobSize();
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.body(Flux.<DataBuffer>create((FluxSink<DataBuffer> emitter) -> {
// for a really big blob I want to read it in chunks, so that my server doesn't use too much memory
for(int i = 0; i < blobSize; i+= tagChunkSize) {
// new DataBuffer that is written to, then emitted later
DefaultDataBuffer dataBuffer = new DefaultDataBufferFactory().allocateBuffer();
try (OutputStream outputStream = dataBuffer.asOutputStream()) {
// write to the outputstream of DataBuffer
tag.BlobReadPartial(outputStream, i, tagChunkSize, FPLibraryConstants.FP_OPTION_DEFAULT_OPTIONS);
// don't know if flushing is strictly neccessary
outputStream.flush();
} catch (IOException | FPLibraryException e) {
log.error("Error reading + writing from tag to http outputstream", e);
emitter.error(e);
}
emitter.next(dataBuffer);
}
// if blob is finished, send "complete" to my flux of DataBuffers
emitter.complete();
}, FluxSink.OverflowStrategy.BUFFER).publishOn(Schedulers.newElastic("centera")).doOnComplete(() -> closeQuietly(tag)), DataBuffer.class);
}
我有一个遗留库,我必须用它来检索文件。这个遗留库在 InputStream 中没有 return,正如您通常期望的那样用于读取内容,但它期望它传递一个开放的 OutputStream,它可以写入。
我必须编写 Webflux REST 服务,将此 OutputStream 写入 org.springframework.web.reactive.function.server.ServerResponse 正文。
legacyLib.BlobRead(outputStream); // writes the stream to an outputstream, that has to be provided by me, and somehow has to end up in the ServerResponse
因为我想将 Stream 直接传递给 ServerResponse,我可能必须做这样的事情,对吗?
ServerResponse.ok().body(magicOutpuStreamToFluxConverter(), DataBuffer.class);
这是 RequestHandler 的重要部分。我遗漏了一些 errorhandling/catching 例外情况,通常可能不需要。请注意,我 publishedOn
一个不同的 Scheduler
用于读取(或者至少,这就是我想要做的),以便此阻塞读取不会干扰我的主事件线程:
private Mono<ServerResponse> writeToServerResponse(@NotNull FPTag tag) {
final long blobSize = tag.getBlobSize();
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.body(Flux.<DataBuffer>create((FluxSink<DataBuffer> emitter) -> {
// for a really big blob I want to read it in chunks, so that my server doesn't use too much memory
for(int i = 0; i < blobSize; i+= tagChunkSize) {
// new DataBuffer that is written to, then emitted later
DefaultDataBuffer dataBuffer = new DefaultDataBufferFactory().allocateBuffer();
try (OutputStream outputStream = dataBuffer.asOutputStream()) {
// write to the outputstream of DataBuffer
tag.BlobReadPartial(outputStream, i, tagChunkSize, FPLibraryConstants.FP_OPTION_DEFAULT_OPTIONS);
// don't know if flushing is strictly neccessary
outputStream.flush();
} catch (IOException | FPLibraryException e) {
log.error("Error reading + writing from tag to http outputstream", e);
emitter.error(e);
}
emitter.next(dataBuffer);
}
// if blob is finished, send "complete" to my flux of DataBuffers
emitter.complete();
}, FluxSink.OverflowStrategy.BUFFER).publishOn(Schedulers.newElastic("centera")).doOnComplete(() -> closeQuietly(tag)), DataBuffer.class);
}