使用 RestTemplate 获取 InputStream

Getting InputStream with RestTemplate

我正在使用 URL class 从中读取 InputStream。有什么方法可以为此使用 RestTemplate 吗?

InputStream input = new URL(url).openStream();
JsonReader reader = new JsonReader(new InputStreamReader(input, StandardCharsets.UTF_8.displayName())); 

如何使用 RestTemplate 而不是使用 URL 获得 InputStream

你不应该直接得到 InputStreamRestTemplate 是为了封装处理响应(和请求)的内容。它的优势在于处理所有 IO 并为您提供一个随时可用的 Java 对象。

RestTemplate 的原作者之一 Brian Clozel stated:

RestTemplate is not meant to stream the response body; its contract doesn't allow it, and it's been around for so long that changing such a basic part of its behavior cannot be done without disrupting many applications.

您需要注册适当的 HttpMessageConverter objects. Those will have access to the response's InputStream, through an HttpInputMessage 对象。

, Spring does come with an HttpMessageConverter implementation for Resource which itself wraps an InputStream, ResourceHttpMessageConverter。它不支持所有 Resource 类型,但由于您无论如何都应该针对接口进行编程,因此您应该只使用超级接口 Resource.

当前的实现 (4.3.5),将 return 一个 ByteArrayResource 与响应流的内容复制到一个新的 ByteArrayInputStream 您可以访问。

您不必关闭流。 RestTemplate 会为您处理。 (如果您尝试使用 InputStreamResourceResourceHttpMessageConverter 支持的另一种类型,这是不幸的,因为它包装了底层响应的 InputStream,但在它可以暴露给您的客户端代码之前被关闭.)

作为变体,您可以将响应作为字节使用,而不是转换为流

byte data[] = restTemplate.execute(link, HttpMethod.GET, null, new BinaryFileExtractor());
return new ByteArrayInputStream(data);

提取器是

public class BinaryFileExtractor implements ResponseExtractor<byte[]> {

  @Override
  public byte[] extractData(ClientHttpResponse response) throws IOException {
    return ByteStreams.toByteArray(response.getBody());
  }
}

Spring 有一个 org.springframework.http.converter.ResourceHttpMessageConverter。它转换 Spring 的 org.springframework.core.io.Resource class。 Resourceclass封装了一个InputStream,可以通过someResource.getInputStream().

获取

综上所述,您实际上可以通过将 Resource.class 指定为 RestTemplate 调用的响应类型来开箱即用地通过 RestTemplate 获得 InputStream .

这是一个使用 RestTemplateexchange(..) 方法之一的示例:

import org.springframework.web.client.RestTemplate;
import org.springframework.http.HttpMethod;
import org.springframework.core.io.Resource;

ResponseEntity<Resource> responseEntity = restTemplate.exchange( someUrlString, HttpMethod.GET, someHttpEntity, Resource.class );

InputStream responseInputStream;
try {
    responseInputStream = responseEntity.getBody().getInputStream();
}
catch (IOException e) {
    throw new RuntimeException(e);
}

// use responseInputStream

前面的回答都没有错,但是没有深入到我喜欢看到的深度。在某些情况下,处理低级别 InputStream 不仅是可取的,而且是必要的,最常见的示例是将大文件从源(某些 Web 服务器)流式传输到目标(数据库)。如果您尝试使用 ByteArrayInputStream,您会看到 OutOfMemoryError,这并不奇怪。是的,您可以推出自己的 HTTP 客户端代码,但您必须处理错误的响应代码、响应转换器等。如果您已经在使用 Spring,寻找 RestTemplate 是一个自然的选择。

在撰写本文时,spring-web:5.0.2.RELEASE 有一个 ResourceHttpMessageConverter 有一个 boolean supportsReadStreaming,如果已设置,响应类型为 InputStreamResource、returns InputStreamResource;否则它 returns 一个 ByteArrayResource。很明显,您不是唯一一个要求流媒体支持的人。

但是,有一个问题:RestTemplateHttpMessageConverter 运行后不久就关闭了响应。因此,即使你请求 InputStreamResource,并且得到了它,也没有用,因为响应流已经关闭。我认为这是他们忽略的设计缺陷;它应该取决于响应类型。所以不幸的是,为了阅读,你必须充分消耗反应;如果使用 RestTemplate.

就不能传递它

写字没问题。如果您想流式传输 InputStreamResourceHttpMessageConverter 会为您完成。在引擎盖下,它使用 org.springframework.util.StreamUtilsInputStreamOutputStream.

一次写入 4096 个字节

一些 HttpMessageConverter 支持所有媒体类型,因此根据您的要求,您可能需要从 RestTemplate 中删除默认的媒体类型,并设置您需要的媒体类型,同时注意它们相对顺序。

最后但并非最不重要的是,ClientHttpRequestFactory 的实现有一个 boolean bufferRequestBody,如果您要上传大型流,您可以并且应该将其设置为 false。否则,你知道的,OutOfMemoryError。在撰写本文时,SimpleClientHttpRequestFactory(JDK 客户端)和 HttpComponentsClientHttpRequestFactory(Apache HTTP 客户端)支持此功能,但不支持 OkHttp3ClientHttpRequestFactory。再次,设计疏忽。

编辑: 已提交工单 SPR-16885.

感谢 Abhijit Sarkar 的回答带路。

我需要下载一个繁重的 JSON 流并将其分解成小的可流式管理的数据块。 JSON 由具有大属性的对象组成:这样的大属性可以序列化到文件中,从而从未编组的 JSON 对象中删除。

另一个用例是按对象下载 JSON 流对象,像 map/reduce 算法一样处理它并生成单个输出,而无需将整个流加载到内存中。

另一个用例是读取一个大 JSON 文件并根据条件只选择几个对象,同时解组为普通旧 Java 对象。

这是一个示例:我们想要流式传输一个非常大的 JSON 数组文件,并且我们只想检索数组中的第一个对象。

鉴于服务器上有这个大文件,可在 http://example.org/testings.json 获得:

[
    { "property1": "value1", "property2": "value2", "property3": "value3" },
    { "property1": "value1", "property2": "value2", "property3": "value3" },
    ... 1446481 objects => a file of 104 MB => take quite long to download...
]

这个JSON数组的每一行都可以解析为这个对象:

@lombok.Data
public class Testing {
    String property1;
    String property2;
    String property3;
}

您需要这个 class 使解析代码可重用:

import com.fasterxml.jackson.core.JsonParser;
import java.io.IOException;
@FunctionalInterface
public interface JsonStreamer<R> {
    /**
     * Parse the given JSON stream, process it, and optionally return an object.<br>
     * The returned object can represent a downsized parsed version of the stream, or the result of a map/reduce processing, or null...
     *
     * @param jsonParser the parser to use while streaming JSON for processing
     * @return the optional result of the process (can be {@link Void} if processing returns nothing)
     * @throws IOException on streaming problem (you are also strongly encouraged to throw HttpMessageNotReadableException on parsing error)
     */
    R stream(JsonParser jsonParser) throws IOException;
}

要解析的class:

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import lombok.AllArgsConstructor;
import org.springframework.http.HttpInputMessage;
import org.springframework.http.HttpOutputMessage;
import org.springframework.http.MediaType;
import org.springframework.http.converter.HttpMessageConverter;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

@AllArgsConstructor
public class StreamingHttpMessageConverter<R> implements HttpMessageConverter<R> {

    private final JsonFactory factory;
    private final JsonStreamer<R> jsonStreamer;

    @Override
    public boolean canRead(Class<?> clazz, MediaType mediaType) {
        return MediaType.APPLICATION_JSON.isCompatibleWith(mediaType);
    }

    @Override
    public boolean canWrite(Class<?> clazz, MediaType mediaType) {
        return false; // We only support reading from an InputStream
    }

    @Override
    public List<MediaType> getSupportedMediaTypes() {
        return Collections.singletonList(MediaType.APPLICATION_JSON);
    }

    @Override
    public R read(Class<? extends R> clazz, HttpInputMessage inputMessage) throws IOException {
        try (InputStream inputStream = inputMessage.getBody();
             JsonParser parser = factory.createParser(inputStream)) {
            return jsonStreamer.stream(parser);
        }
    }

    @Override
    public void write(R result, MediaType contentType, HttpOutputMessage outputMessage) {
        throw new UnsupportedOperationException();
    }

}

然后,这里是用于流式传输 HTTP 响应、解析 JSON 数组和 return 仅第一个未编组对象的代码:

// You should @Autowire these:
JsonFactory jsonFactory = new JsonFactory();
ObjectMapper objectMapper = new ObjectMapper();
RestTemplateBuilder restTemplateBuilder = new RestTemplateBuilder();

// If detectRequestFactory true (default): HttpComponentsClientHttpRequestFactory will be used and it will consume the entire HTTP response, even if we close the stream early
// If detectRequestFactory false: SimpleClientHttpRequestFactory will be used and it will close the connection as soon as we ask it to

RestTemplate restTemplate = restTemplateBuilder.detectRequestFactory(false).messageConverters(
    new StreamingHttpMessageConverter<>(jsonFactory, jsonParser -> {

        // While you use a low-level JsonParser to not load everything in memory at once,
        // you can still profit from smaller object mapping with the ObjectMapper
        if (!jsonParser.isClosed() && jsonParser.nextToken() == JsonToken.START_ARRAY) {
            if (!jsonParser.isClosed() && jsonParser.nextToken() == JsonToken.START_OBJECT) {
                return objectMapper.readValue(jsonParser, Testing.class);
            }
        }
        return null;

    })
).build();

final Testing firstTesting = restTemplate.getForObject("http://example.org/testings.json", Testing.class);
log.debug("First testing object: {}", firstTesting);

您可以传入自己的响应提取器。这是一个示例,我以流方式将 json 写入磁盘 -

        RestTemplate restTemplate = new RestTemplateBuilder().basicAuthentication("user", "their_password" ).build();

        int responseSize = restTemplate.execute(uri,
            HttpMethod.POST,
            (ClientHttpRequest requestCallback) -> {
                requestCallback.getHeaders().setContentType(MediaType.APPLICATION_JSON);
                requestCallback.getBody().write(body.getBytes());
            },
            responseExtractor -> {
                FileOutputStream fos  = new FileOutputStream(new File("out.json"));
                return StreamUtils.copy(responseExtractor.getBody(), fos);
            }
    )

我遇到了同样的问题,通过扩展 RestTemplate 并仅在读取流后关闭连接来解决。

你可以在这里看到代码:https://github.com/ItamarBenjamin/stream-rest-template

非常简单但有效的解决方案是使用 ResponseExtractor。当您想在非常大的 InputStream 上操作并且您的 RAM 有限时,它特别有用。

以下是您应该如何实施它:

public void consumerInputStreamWithoutBuffering(String url, Consumer<InputStream> streamConsumer) throws IOException {

    final ResponseExtractor responseExtractor =
            (ClientHttpResponse clientHttpResponse) -> {
                streamConsumer.accept(clientHttpResponse.getBody());
                return null;
            };

    restTemplate.execute(url, HttpMethod.GET, null, responseExtractor);
}

然后,在任何需要的地方调用该方法:

Consumer<InputStream> doWhileDownloading = inputStream -> {
                //Use inputStream for your business logic...
};

consumerInputStreamWithoutBuffering("https://localhost.com/download", doWhileDownloading);

请注意以下常见陷阱

public InputStream getInputStreamFromResponse(String url) throws IOException {

    final ResponseExtractor<InputStream> responseExtractor =
            clientHttpResponse -> clientHttpResponse.getBody();

    return restTemplate.execute(url, HttpMethod.GET, null, responseExtractor);
}

这里InputStream将在您访问之前关闭

我就是这样解决的。 希望对大家有所帮助

    @GetMapping("largeFile")
    public ResponseEntity<InputStreamResource> downloadLargeFile(
            @RequestParam("fileName") String fileName
    ) throws IOException {

        RestTemplate restTemplate = new RestTemplate();

        // Optional Accept header
        RequestCallback requestCallback = request -> request.getHeaders()
                .setAccept(Arrays.asList(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL));

        // Streams the response instead of loading it all in memory
        ResponseExtractor<InputStreamResource> responseExtractor = response -> {
            // Here I write the response to a file but do what you like
            Path path = Paths.get("tmp/" + fileName);
            Files.copy(response.getBody(), path, StandardCopyOption.REPLACE_EXISTING);
            return new InputStreamResource(new FileInputStream(String.format("tmp/%s", fileName)));
        };

        InputStreamResource response = restTemplate.execute(
            String.format("http://%s:%s/file/largeFileRestTemplate?fileName=%s", host, "9091", fileName),
            HttpMethod.GET,
            requestCallback,
            responseExtractor
        );

        return ResponseEntity
            .ok()
            .header(HttpHeaders.CONTENT_DISPOSITION, String.format("attachment; filename=%s", fileName))
            .body(response);
    }