在 Micronaut 控制器中流式传输大量响应而不会耗尽内存

Stream large response in Micronaut controller without going out of memory

我们将 Micronaut 与 Mongo 结合使用,通过一些控制器公开数据。由于响应实体的大小不断增加,我们的应用程序有时会出现内存不足的情况。因此,我们正在研究切换到异步 mongo 驱动程序并使用反应式响应将数据流式传输到客户端。很遗憾,我们无法更改 API 响应结构或内容类型(所有 application/json

我们的 API 个 returned 实体结构如下:

  { "field": "value" },
  { "field": "value" },
  { "field": "value" }

我们开始使用这个控制器工作,其中 dataStore return 是 Publisher<Example>:

    Flowable<Example> getAllExamples() {
        return Flowable.fromPublisher(dataStore.find()).map(SomeMapper::toPublic);


其他APIs return(imo更明智的)结构:

  "list": [
    { "field": "value" },
    { "field": "value" },
    { "field": "value" }
  "meta": {

我们能否为这样的实体应用类似的 publisher/flowable 模式,或者在将此类响应发送出去之前将其加载到内存中?


    Single<ExamplesWrapper> getAllDev() {
        Publisher<Example> dev = dataStore.find();
        return Flowable.fromPublisher(dev)
                .collect((Callable<ArrayList<Example>>) ArrayList::new, ArrayList::add)


将 Flowable 添加到响应包装器中:

public class ExamplesWrapper {

    private final Flowable<Example> examples;

    public ExamplesWrapper(Flowable<Example> examples) {
        this.examples = examples;

    public Flowable<Example> getExamples() {
        return examples;

也因一些不错的 Jackson 映射异常而失败。



6.20 Writing Response Data

Reactively Writing Response Data

Micronaut’s HTTP server supports writing chunks of response data by returning a Publisher that emits objects that can be encoded to the HTTP response.

The following table summarizes example return type signatures and the behaviour the server exhibits to handle them: Return Type Description

  • Flowable<byte[]>: A Flowable that emits each chunk of content as a byte[] without blocking
  • Flux<ByteBuf>: A Reactor Flux that emits each chunk as a Netty ByteBuf
  • Publisher<String>: A Publisher that emits each chunk of content as a String
  • Flowable<Book> When emitting a POJO, each emitted object is encoded as JSON by default without blocking

When returning a reactive type, the server uses a Transfer-Encoding of chunked and keeps writing data until the Publisher onComplete method is called.

我明白这一点,所以如果你想让 Micronaut 机制流式传输你的东西,你需要有像 Flowable<item>Flux<item>Publisher<item> 这样的签名,其中项目是一个块您的回复,而不是完整的项目。然后 Micronaut 将响应来自 Flowable 或等效的块。



public Flowable<String> getAllExamples() {
    ObjectMapper objectMapper = new ObjectMapper();
    Publisher<Example> dev = dataStore.find();
    return Flowable.fromPublisher(dev)
            .concatMap(item -> Flowable.just(objectMapper.writeValueAsString(item), ","))
            .startWith("{\"list\": [")



我确实测试了在自定义 Jackson 映射器中直接写入 JsonGenerator,按照 jackson streaming api, but micronaut RoutingInboundHandler 中概述的那样刷新对象似乎不会将响应刷新回最终用户,而是对其进行缓冲,从而导致记忆。方法适用于 Spring Boot,因此它可能是 Micronaut 中缺少的功能。

我在使用 Micronaut 时也发生了同样的缓冲 Writeable (blocking) responses and trying to flush data as it is written. I opened an issue about that to micronaut core