Flowable 使用 RxJava ReactiveX 执行任务和 return 值

Flowable to perform task and return value using RxJava ReactiveX

我有下面的方法来检查文件格式是否正确,如果正确,它会添加到 requestBody,否则,它应该向该文件的客户端抛出一条错误消息格式无效。

public Maybe<HttpResponse<?>> post(Publisher<CompletedFileUpload> images) {
        return Flowable.fromPublisher(images)
                .collect(MultipartBody::builder, (requestBody, file) -> {
                    if (new FileExtension().fileExtensionValidation(file.getFilename())) {
                        requestBody
                                .addPart("images", file.getFilename(), MediaType.TEXT_PLAIN_TYPE, file.getBytes());
                    }
                })
                .flatMapMaybe(requestBody -> {
                    if (true)
                        return iImageUploadClient.post(requestBody.build());
                    else
                        return Maybe.just(HttpResponse.serverError("Image file extension invalid, should ne .png, .jpg, .jpeg,.gif"));
                });
    }

此代码检查文件格式 if (new FileExtension().fileExtensionValidation(file.getFilename())) 失败时应 return 一条消息 return Maybe.just(HttpResponse.serverError("Image file extension invalid, should ne .png, .jpg, .jpeg,.gif"));

我写的 if (true) 总是正确的,相反我需要在这里检查或者我如何 return 从 .collect() 功能。我怎样才能用反应 java

做到这一点

您可以在收集和最后一个 flatMapMaybe 之间使用共享的 AtomicBoolean。此外,如果你想在那里停止图像,抛出一个异常并将它变成一个中性的多部分体,这样 flatMapMaybe 仍然可以运行。

public Maybe<HttpResponse<?>> post(Publisher<CompletedFileUpload> images) {
    return Maybe.<HttpResponse<?>>defer(() -> {
        AtomicBoolean formatError = new AtomicBoolean();
        return Flowable.fromPublisher(images)
                .collect(MultipartBody::builder, (requestBody, file) -> {
                    if (new FileExtension().fileExtensionValidation(file.getFilename())) {
                        requestBody
                                .addPart("images", file.getFilename(),
                                     MediaType.TEXT_PLAIN_TYPE, file.getBytes());
                    } else {
                        formatError.set(true);
                        throw new CancellationException();
                    }
                })
                .onErrorResumeNext(error -> {
                     if (error instanceof CancellationException) {
                         return Single.just(MultipartBody.builder());
                     }
                     return Single.error(error);
                })
                .flatMapMaybe(requestBody -> {
                    if (!formatError.get())
                        return iImageUploadClient.post(requestBody.build());
                    else
                        return Maybe.just(HttpResponse.serverError(
                            "Image file extension invalid, should ne .png, .jpg, .jpeg,.gif"));
                });
    });
}