如何使用 Spring Boot @RestController 流式传输分块响应
How To Stream Chunked Response With Spring Boot @RestController
我在这上面花了一天的时间,但找不到有效的解决方案。在我们的应用程序中,我们有几个可以 return 大响应的端点。我一直在努力寻找一种机制,使我们能够在处理数据库查询结果时流式传输响应。主要目标是限制服务端的峰值内存使用(不需要内存中的整个响应)并最小化响应第一个字节的时间(如果响应没有开始进入,则客户端系统会超时)指定时间 - 10 分钟)。我真的很惊讶这太难了。
我找到了 StreamingResponseBody,它似乎接近我们想要的,虽然我们并不真正需要异步方面,但我们只希望能够在处理查询结果时开始流式传输响应。我也尝试过其他方法,例如使用 @ResponseBody 注释、returning void 以及添加 OutputStream 的参数,但这没有用,因为传递的 OutputStream 基本上只是缓冲整个结果的 CachingOutputStream。这是我现在拥有的...
资源方法:
@GetMapping(value = "/catalog/features")
public StreamingResponseBody findFeatures(
@RequestParam("provider-name") String providerName,
@RequestParam(name = "category", required = false) String category,
@RequestParam("date") String date,
@RequestParam(value = "version-state", defaultValue = "*") String versionState) {
CatalogVersionState catalogVersionState = getCatalogVersionState(versionState);
log.info("GET - Starting DB query...");
final List<Feature> features
= featureService.findFeatures(providerName,
category,
ZonedDateTime.parse(date),
catalogVersionState);
log.info("GET - Query done!");
return new StreamingResponseBody() {
@Override
public void writeTo(OutputStream outputStream) throws IOException {
log.info("GET - Transforming DTOs");
JsonFactory jsonFactory = new JsonFactory();
JsonGenerator jsonGenerator = jsonFactory.createGenerator(outputStream);
Map<Class<?>, JsonSerializer<?>> serializerMap = new HashMap<>();
serializerMap.put(DetailDataWrapper.class, new DetailDataWrapperSerializer());
serializerMap.put(ZonedDateTime.class, new ZonedDateTimeSerializer());
ObjectMapper jsonMapper = Jackson2ObjectMapperBuilder.json()
.serializersByType(serializerMap)
.deserializerByType(ZonedDateTime.class, new ZonedDateTimeDeserializer())
.build();
jsonGenerator.writeStartArray();
for (Feature feature : features) {
FeatureDto dto = FeatureMapper.MAPPER.featureToFeatureDto(feature);
jsonMapper.writeValue(jsonGenerator, dto);
jsonGenerator.flush();
}
jsonGenerator.writeEndArray();
log.info("GET - DTO transformation done!");
}
};
}
异步配置:
@Configuration
@EnableAsync
@EnableScheduling
public class ProductCatalogStreamingConfig extends WebMvcConfigurerAdapter {
private final Logger log = LoggerFactory.getLogger(ProductCatalogStreamingConfig.class);
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setDefaultTimeout(360000).setTaskExecutor(getAsyncExecutor());
configurer.registerCallableInterceptors(callableProcessingInterceptor());
}
@Bean(name = "taskExecutor")
public AsyncTaskExecutor getAsyncExecutor() {
log.debug("Creating Async Task Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("AsyncStreaming-");
return executor;
}
@Bean
public CallableProcessingInterceptor callableProcessingInterceptor() {
return new TimeoutCallableProcessingInterceptor() {
@Override
public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws
Exception {
log.error("timeout!");
return super.handleTimeout(request, task);
}
};
}
}
我原以为客户端会在 StreamingResponseBody.writeTo() 被调用后立即开始看到响应,并且响应 headers 将包含
Content-Encoding: chunked
但不是
Content-Length: xxxx
相反,在 StreamingResponseBody.writeTo() 已 returned 并且响应包含 Content-Length 之前,我没有在客户端看到任何响应。 (但不是 Content-Encoding)
我的问题是,当我在 writeTo() 中写入 OutputStream 时告诉 Spring 发送分块响应而不缓存整个负载并仅在最后发送它的秘诀是什么?具有讽刺意味的是,我发现一些帖子想知道如何禁用分块编码,但对启用它一无所知。
事实证明,上面的代码完全符合我们的要求。我们观察到的行为不是由于 Spring 实现这些功能的方式造成的,它是由公司特定的启动器引起的,该启动器安装了干扰正常 Spring 行为的 servlet 过滤器。这个过滤器包装了 HttpServletResponse OutputStream,这就是我们观察到问题中提到的 CachingOutputStream 的原因。删除启动程序后,上面的代码完全按照我们希望的方式运行,我们正在以不会干扰此行为的方式重新实现 servlet 过滤器。
我在这上面花了一天的时间,但找不到有效的解决方案。在我们的应用程序中,我们有几个可以 return 大响应的端点。我一直在努力寻找一种机制,使我们能够在处理数据库查询结果时流式传输响应。主要目标是限制服务端的峰值内存使用(不需要内存中的整个响应)并最小化响应第一个字节的时间(如果响应没有开始进入,则客户端系统会超时)指定时间 - 10 分钟)。我真的很惊讶这太难了。
我找到了 StreamingResponseBody,它似乎接近我们想要的,虽然我们并不真正需要异步方面,但我们只希望能够在处理查询结果时开始流式传输响应。我也尝试过其他方法,例如使用 @ResponseBody 注释、returning void 以及添加 OutputStream 的参数,但这没有用,因为传递的 OutputStream 基本上只是缓冲整个结果的 CachingOutputStream。这是我现在拥有的...
资源方法:
@GetMapping(value = "/catalog/features")
public StreamingResponseBody findFeatures(
@RequestParam("provider-name") String providerName,
@RequestParam(name = "category", required = false) String category,
@RequestParam("date") String date,
@RequestParam(value = "version-state", defaultValue = "*") String versionState) {
CatalogVersionState catalogVersionState = getCatalogVersionState(versionState);
log.info("GET - Starting DB query...");
final List<Feature> features
= featureService.findFeatures(providerName,
category,
ZonedDateTime.parse(date),
catalogVersionState);
log.info("GET - Query done!");
return new StreamingResponseBody() {
@Override
public void writeTo(OutputStream outputStream) throws IOException {
log.info("GET - Transforming DTOs");
JsonFactory jsonFactory = new JsonFactory();
JsonGenerator jsonGenerator = jsonFactory.createGenerator(outputStream);
Map<Class<?>, JsonSerializer<?>> serializerMap = new HashMap<>();
serializerMap.put(DetailDataWrapper.class, new DetailDataWrapperSerializer());
serializerMap.put(ZonedDateTime.class, new ZonedDateTimeSerializer());
ObjectMapper jsonMapper = Jackson2ObjectMapperBuilder.json()
.serializersByType(serializerMap)
.deserializerByType(ZonedDateTime.class, new ZonedDateTimeDeserializer())
.build();
jsonGenerator.writeStartArray();
for (Feature feature : features) {
FeatureDto dto = FeatureMapper.MAPPER.featureToFeatureDto(feature);
jsonMapper.writeValue(jsonGenerator, dto);
jsonGenerator.flush();
}
jsonGenerator.writeEndArray();
log.info("GET - DTO transformation done!");
}
};
}
异步配置:
@Configuration
@EnableAsync
@EnableScheduling
public class ProductCatalogStreamingConfig extends WebMvcConfigurerAdapter {
private final Logger log = LoggerFactory.getLogger(ProductCatalogStreamingConfig.class);
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setDefaultTimeout(360000).setTaskExecutor(getAsyncExecutor());
configurer.registerCallableInterceptors(callableProcessingInterceptor());
}
@Bean(name = "taskExecutor")
public AsyncTaskExecutor getAsyncExecutor() {
log.debug("Creating Async Task Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("AsyncStreaming-");
return executor;
}
@Bean
public CallableProcessingInterceptor callableProcessingInterceptor() {
return new TimeoutCallableProcessingInterceptor() {
@Override
public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws
Exception {
log.error("timeout!");
return super.handleTimeout(request, task);
}
};
}
}
我原以为客户端会在 StreamingResponseBody.writeTo() 被调用后立即开始看到响应,并且响应 headers 将包含
Content-Encoding: chunked
但不是
Content-Length: xxxx
相反,在 StreamingResponseBody.writeTo() 已 returned 并且响应包含 Content-Length 之前,我没有在客户端看到任何响应。 (但不是 Content-Encoding)
我的问题是,当我在 writeTo() 中写入 OutputStream 时告诉 Spring 发送分块响应而不缓存整个负载并仅在最后发送它的秘诀是什么?具有讽刺意味的是,我发现一些帖子想知道如何禁用分块编码,但对启用它一无所知。
事实证明,上面的代码完全符合我们的要求。我们观察到的行为不是由于 Spring 实现这些功能的方式造成的,它是由公司特定的启动器引起的,该启动器安装了干扰正常 Spring 行为的 servlet 过滤器。这个过滤器包装了 HttpServletResponse OutputStream,这就是我们观察到问题中提到的 CachingOutputStream 的原因。删除启动程序后,上面的代码完全按照我们希望的方式运行,我们正在以不会干扰此行为的方式重新实现 servlet 过滤器。