如何将 Reactor Flux<String> 转换为 InputStream

How to convert Reactor Flux<String> to InputStream

鉴于我有一个未知大小的 Flux<String>,我怎样才能将它转换成其他库期望的 InputStream

例如,对于 WebClient,我可以使用这种方法实现

WebClient.get('example.com').exchange.flatMap { it.bodyToMono(InputStreamResource::class.java) }.map { it.inputStream }

但是当我有 Flux<String> 作为输入时,我不知道如何做同样的事情?

您可以将已知大小的 Flux<String> 转换为 Mono<byte[]> 进而可以使用组成一个InputStream。检查一下(在 Java 中):

Flux<String> stringFlux = ...;
stringFlux.collect(() -> new ByteArrayOutputStream(),
                   (baos, str) -> {
                       try {
                           baos.write(str.getBytes());
                       } catch (IOException e) {
                           // do nothing
                       }
                   })
          .map(baos -> new ByteArrayInputStream(baos.toByteArray()))
          .map(inputStream -> ... // call other library);

这需要冷静 Flux<T>,因为 collect() 将在 Flux 完成后成为 运行。对于未知大小的 Flux<T>(假设每个 String 都是一个独立的对象),它变得更简单:

Flux<String> stringFlux = ...;
stringFlux.map(str -> new ByteArrayInputStream(str.getBytes()))
          .map(inputStream -> ... // call other library);

可能有很多方法可以做到这一点。一种可能性是使用 PipedInputStream and PipedOutputStream.

它的工作方式是你 link 一个输出流到一个输入流,这样你写到输出流的所有东西都可以从 linked 输入流中读取,通过这样做, 在他们两个之间创建一个管道。

PipedInputStream in = new PipedInputStream();
PipedOutputStream out = PipedOutputStream(in);

但有一个警告,根据管道流的文档,写入过程和读取过程必须发生在不同的线程上,否则我们可能会导致死锁。

所以,回到我们的反应流场景,我们可以创建一个管道(如上所述)并订阅 Flux 对象,并将您从中获得的数据写入管道输出流.无论你在那里写什么,都可以在管道的另一端读取,在相应的输入流中。此输入流是您可以与非反应性方法共享的输入流。

我们只需要格外小心,在单独的线程上订阅 Flux,例如subscribeOn(Schedulers.elastic()).

这是此类订阅者的一个非常基本的实现:

class PipedStreamSubscriber extends BaseSubscriber<byte[]> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private final PipedInputStream in;
    private PipedOutputStream out;

    PipedStreamSubscriber(PipedInputStream in) {
        Objects.requireNonNull(in, "The input stream must not be null");
        this.in = in;
    }

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        //change if you want to control back-pressure
        super.hookOnSubscribe(subscription);
        try {
            this.out = new PipedOutputStream(in);
        } catch (IOException e) {
            //TODO throw a contextual exception here
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void hookOnNext(byte[] payload) {
        try {
            out.write(payload);
        } catch (IOException e) {
            //TODO throw a contextual exception here
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void hookOnComplete() {
        close();
    }

    @Override
    protected void hookOnError(Throwable error) {
        //TODO handle the error or at least log it
        logger.error("Failure processing stream", error);
        close();
    }

    @Override
    protected void hookOnCancel() {
        close();
    }

    private void close() {
        try {
            if (out != null) {
                out.close();
            }
        } catch (IOException e) {
            //probably just ignore this one or simply  log it
        }
    }
}

使用这个订阅者,我可以定义一个非常简单的实用方法,将 Flux<byte[] 变成 InputStream,如下所示:

static InputStream createInputStream(Flux<byte[]> flux) {

    PipedInputStream in = new PipedInputStream();
    flux.subscribeOn(Schedulers.elastic())
        .subscribe(new PipedStreamSubscriber(in));

    return in;
}

请注意,当流程完成、发生错误或取消订阅时,我特别小心地关闭了输出流,否则我们 运行 有在读取端阻塞的风险,等待更多输入到达。关闭输出流是管道另一侧输入流结束的信号。

现在 InputStream 可以像任何常规流一样使用,因此您可以将它传递给非反应性方法,例如

Flux<byte[]> jedi = Flux.just("Luke\n", "Obi-Wan\n", "Yoda\n").map(String::getBytes);

try (InputStream in = createInputStream(jedi)) {
    byte[] data = new byte[5];
    int size = 0;
    while ((size = in.read(data)) > 0) {
        System.out.printf("%s", new String(data, 0, size));
    }
} 

上面的代码产生:

Luke
Obi-Wan
Yoda

您可以将Flux<DataBuffer>减少到Mono<DataBuffer>,然后转移到InputStream

在WebFlux中上传文件到GridFs的示例代码:

    private GridFsTemplate gridFsTemplate;

    public Mono<String> storeFile(FilePart filePart) {
        HttpHeaders headers = filePart.headers();
        String contentType = Objects.requireNonNull(headers.getContentType()).toString();

        return filePart.content()
                .reduce(DataBuffer::write).map(DataBuffer::asInputStream)
                .map(input -> gridFsTemplate.store(input, filePart.filename(), contentType))
                .map(ObjectId::toHexString);
    }

Edwin 的回答对我没有用,因为上游的错误被订阅者吞没并且没有传播到 InputStream 的消费者。尽管如此,受埃德温回答的启发,我找到了不同的解决方案。这是一个使用 Flux<ByteArray> 并将其作为 InputStream 下游传递的示例。该示例包括解密以突出显示即使在 Flux<ByteStream> 完全消耗后仍可能操纵 OutputStream 的可能性,最终产生一个错误并传播到下游。

fun decryptAndGetInputStream(flux: Flux<ByteArray>, cipher: Cipher): Flux<InputStream> {
    val inputStream = PipedInputStream()
    val outputStream = PipedOutputStream(inputStream)
    val isStreamEmitted = AtomicBoolean(false)
    
    return flux.handle<InputStream> { byteArray, sink ->
        try {
            outputStream.write(cipher.update(byteArray))
            // emit the input stream as soon as we get the first chunk of bytes
            // make sure we do it only once
            if (!isStreamEmitted.getAndSet(true)) {
                sink.next(inputStream)
            }
        } catch (e: Exception) {
            // catch all errors to pass them to the sink
            sink.error(e)
        }
    }.doOnComplete { 
        // here we have a last chance to throw an error  
        outputStream.write(cipher.doFinal())
    }.doOnTerminate {
        // error thrown here won't get propagated downstream
        // since this callback is triggered after flux's completion 
        outputStream.flush()
        outputStream.close()
    }
}

这里的问题是使用 handle 运算符生成一个 Flux 最多发出一个项目。与 Mono 不同,Flux 不会在第一次发射后立即终止。虽然它不会再发射任何项目,但它会保持“打开”状态以发射在第一次发射后发生的最终错误。

下面是一个使用 Flux<InputStream> 并将其转换为 Mono 的示例。

fun decryptAndGetProcessingResult(flux: Flux<ByteArray>, cipher: Cipher): Mono<Result> =
    decryptAndGetInputStream(flux, cipher)
        // the following operator gets called at most once
        .flatMap { inputStream ->
            // wrap the blocking operation into mono
            // subscribed on another thread to avoid deadlocks
            Mono.fromCallable { 
                processInputStream(inputStream)
            }.subscribeOn(Schedulers.elastic())
        // to get mono out of flux we implement reduce operator
        // although it gets never called
        }.reduce { t, _ -> t }

这里的另一个优点是使用 InputStream 的线程在第一个数据块可用之前不会阻塞。