使用 ResponseEntity 并确保关闭 InputStream 的正确流式传输方式

Proper way of streaming using ResponseEntity and making sure the InputStream gets closed

我们的一个应用程序泄漏了文件句柄,我们还没有找到原因。

在代码中我可以看到几个类似的函数:

public ResponseEntity<InputStreamResource> getFoo( ... ) {
    InputStream content = getContent(...)
    InputStreamResource isr = new InputStreamResource(content);
    return ResponseEntity.status(HttpServletResponse.SC_OK).body(isr);
}

if 检查并为简洁起见删除了 try / catch

我确信这部分会导致问题,因为当我用 JMeter 加载测试这个特定代码时,我可以看到 getContent() 在这个阶段失败了:

is = Files.newInputStream(f.toPath());

通常我会关闭 InputStream 但因为这个简短的代码我无法在 returnbody.

的调用之前关闭流

当我 运行 lsof(代码 运行s on Linux)时,我可以看到成千上万的文件以读取模式打开。所以我确定这个问题是由于流没有关闭引起的。

是否有我应该交易的最佳实践代码?

因为这个 InputStream 基本上来自一个简单的文件,一个很好的替代是这个代码:

FileSystemResource fsr = new FileSystemResource(fileName);
return ResponseEntity.status(HttpServletResponse.SC_OK).body(fsr);

FileSystemResource 可以使用 java.util.Filejava.nio.file.Path 甚至 String 指向相关文件。

您可以重构所有读取本地文件并将其内容设置为 HTTP 响应主体的控制器方法:

而不是使用 ResponseEntity 方法,而是注入底层 HttpServletResponse 并将从 getContent(...) 方法返回的输入流的字节复制到 HttpServletResponse 的输出流,例如通过使用 Apache CommonsIO 或 Google Guava 库的 IO 相关实用方法。在任何情况下,请确保关闭输入流!下面的代码通过使用 'try-with-resources' 语句隐含地执行此操作,该语句在语句末尾关闭声明的输入流。

@RequestMapping(value="/foo", method=RequestMethod.GET)
public void getFoo(HttpServletResponse response) {
    // use Java7+ try-with-resources
    try (InputStream content = getContent(...)) {

        // if needed set content type and attachment header
        response.addHeader("Content-disposition", "attachment;filename=foo.txt");
        response.setContentType("txt/plain");

        // copy content stream to the HttpServletResponse's output stream
        IOUtils.copy(myStream, response.getOutputStream());

        response.flushBuffer();
    }
}

参考:

https://docs.oracle.com/javase/7/docs/api/java/io/InputStream.html https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html https://google.github.io/guava/releases/19.0/api/docs/com/google/common/io/ByteStreams.html https://commons.apache.org/proper/commons-io/javadocs/api-release/index.html

(特别看看 class org.apache.commons.io.IOUtils 的方法 public static int copy(InputStream input, OutputStream output) throws IOExceptionpublic static int copyLarge(InputStream input, OutputStream output) throws IOException

假设您使用的是 Spring,您的方法可以 return 和 Resource and let Spring handle the rest (including closing underlying stream). There are few implementations of Resource are available within Spring API,否则您需要实现自己的方法。最后,你的方法会变得简单,就像下面这样

public ResponseEntity<Resource> getFo0(...) {
    return new InputStreamResource(<Your input stream>);
}

你可以尝试使用 StreamingResponseBody

StreamingResponseBody

A controller method return value type for asynchronous request processing where the application can write directly to the response OutputStream without holding up the Servlet container thread.

因为您在一个单独的线程上工作,直接写入响应,所以您在 return 之前调用 close() 的问题已解决。

或许你可以从下面的例子开始

public ResponseEntity<StreamingResponseBody> export(...) throws FileNotFoundException {
    //...

    InputStream inputStream = new FileInputStream(new File("/path/to/example/file"));


    StreamingResponseBody responseBody = outputStream -> {

        int numberOfBytesToWrite;
        byte[] data = new byte[1024];
        while ((numberOfBytesToWrite = inputStream.read(data, 0, data.length)) != -1) {
            System.out.println("Writing some bytes..");
            outputStream.write(data, 0, numberOfBytesToWrite);
        }

        inputStream.close();
    };

    return ResponseEntity.ok()
            .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=generic_file_name.bin")
            .contentType(MediaType.APPLICATION_OCTET_STREAM)
            .body(responseBody);
}

你也可以尝试使用Files(因为java 7)

因此您无需管理 InputStream

    File file = new File("/path/to/example/file");

    StreamingResponseBody responseBody = outputStream -> {
        Files.copy(file.toPath(), outputStream);
    };

正如@Stackee007 在评论中描述的那样,在生产环境的重负载下,为 TaskExecutor 定义 @Configuration class 以调整参数和管理 Async 个进程。

@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {

    private final Logger log = LoggerFactory.getLogger(AsyncConfiguration.class);

    private final TaskExecutionProperties taskExecutionProperties;

    public AsyncConfiguration(TaskExecutionProperties taskExecutionProperties) {
        this.taskExecutionProperties = taskExecutionProperties;
    }

    //  ---------------> Tune parameters here
    @Override
    @Bean(name = "taskExecutor")
    public Executor getAsyncExecutor() {
        log.debug("Creating Async Task Executor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(taskExecutionProperties.getPool().getCoreSize());
        executor.setMaxPoolSize(taskExecutionProperties.getPool().getMaxSize());
        executor.setQueueCapacity(taskExecutionProperties.getPool().getQueueCapacity());
        executor.setThreadNamePrefix(taskExecutionProperties.getThreadNamePrefix());
        return executor;
    }
    
    //  ---------------> Use this task executor also for async rest methods
    @Bean
    protected WebMvcConfigurer webMvcConfigurer() {
        return new WebMvcConfigurer() {
            @Override
            public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
                configurer.setTaskExecutor(getTaskExecutor());
            }
        };
    }

    @Bean
    protected ConcurrentTaskExecutor getTaskExecutor() {
        return new ConcurrentTaskExecutor(this.getAsyncExecutor());
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}

如何使用 mockMvc 进行测试

您可以在集成测试中简单地遵循此示例代码:

    .andExpect(request().asyncStarted())
    .andDo(MvcResult::getAsyncResult)
    .andExpect(status().isOk()).getResponse().getContentAsByteArray();

ResponseEntity<StreamingResponseBody> 的内容类型在此示例中是 MediaType.APPLICATION_OCTET_STREAM,您可以获得 byte[] (.getContentAsByteArray()),但您可以获得 String/Json/plaintext 的所有内容,具体取决于您的 body 响应内容类型。