在 Micronaut 控制器中流式传输大量响应而不会耗尽内存

Stream large response in Micronaut controller without going out of memory

我们将 Micronaut 与 Mongo 结合使用,通过一些控制器公开数据。由于响应实体的大小不断增加,我们的应用程序有时会出现内存不足的情况。因此,我们正在研究切换到异步 mongo 驱动程序并使用反应式响应将数据流式传输到客户端。很遗憾,我们无法更改 API 响应结构或内容类型(所有 application/json

我们的 API 个 returned 实体结构如下:

[
  { "field": "value" },
  { "field": "value" },
  ...
  { "field": "value" }
]

我们开始使用这个控制器工作,其中 dataStore return 是 Publisher<Example>:

    @Get("all")
    Flowable<Example> getAllExamples() {
        return Flowable.fromPublisher(dataStore.find()).map(SomeMapper::toPublic);
    }

这很好用,在将大量示例流式传输到客户端之前,不必将其完全加载到内存中。

其他APIs return(imo更明智的)结构:

{
  "list": [
    { "field": "value" },
    { "field": "value" },
    ...
    { "field": "value" }
  ],
  "meta": {
    ...
  }
}

我们能否为这样的实体应用类似的 publisher/flowable 模式,或者在将此类响应发送出去之前将其加载到内存中?

我们试过像这样的签名:

    @Get("all/dev")
    Single<ExamplesWrapper> getAllDev() {
        Publisher<Example> dev = dataStore.find();
        return Flowable.fromPublisher(dev)
                .map(mapper::map)
                .collect((Callable<ArrayList<Example>>) ArrayList::new, ArrayList::add)
                .map(ExampleWrapper::new);
    }

包装器将在其中添加一些元数据。但这再次将其全部加载到内存中,然后再发送出去,导致应用程序崩溃。

将 Flowable 添加到响应包装器中:


public class ExamplesWrapper {

    private final Flowable<Example> examples;

    @ConstructorProperties({"examples"})
    public ExamplesWrapper(Flowable<Example> examples) {
        this.examples = examples;
    }

    public Flowable<Example> getExamples() {
        return examples;
    }
}

也因一些不错的 Jackson 映射异常而失败。

元数据不依赖于实际示例数据(它添加了一些静态公司信息)。我们能否以某种方式实现这样的端点而不必将所有数据加载到内存中?

来自documentation

6.20 Writing Response Data

Reactively Writing Response Data

Micronaut’s HTTP server supports writing chunks of response data by returning a Publisher that emits objects that can be encoded to the HTTP response.

The following table summarizes example return type signatures and the behaviour the server exhibits to handle them: Return Type Description

  • Flowable<byte[]>: A Flowable that emits each chunk of content as a byte[] without blocking
  • Flux<ByteBuf>: A Reactor Flux that emits each chunk as a Netty ByteBuf
  • Publisher<String>: A Publisher that emits each chunk of content as a String
  • Flowable<Book> When emitting a POJO, each emitted object is encoded as JSON by default without blocking

When returning a reactive type, the server uses a Transfer-Encoding of chunked and keeps writing data until the Publisher onComplete method is called.

我明白这一点,所以如果你想让 Micronaut 机制流式传输你的东西,你需要有像 Flowable<item>Flux<item>Publisher<item> 这样的签名,其中项目是一个块您的回复,而不是完整的项目。然后 Micronaut 将响应来自 Flowable 或等效的块。

在这种情况下,我想到的一件事是您可以自己拆分成合适的块。这样流式传输大量响应而不将它们缓冲到内存中应该可行。

所以像这样:

@Get("all")
public Flowable<String> getAllExamples() {
    ObjectMapper objectMapper = new ObjectMapper();
    Publisher<Example> dev = dataStore.find();
    return Flowable.fromPublisher(dev)
            .map(mapper::map)
            .concatMap(item -> Flowable.just(objectMapper.writeValueAsString(item), ","))
            .startWith("{\"list\": [")
            .concatWith(Flowable.just("],\"meta\":\"whatever\"}"));
}

它很老套,但似乎适用于这种情况。


一些无效的方法:

我确实测试了在自定义 Jackson 映射器中直接写入 JsonGenerator,按照 jackson streaming api, but micronaut RoutingInboundHandler 中概述的那样刷新对象似乎不会将响应刷新回最终用户,而是对其进行缓冲,从而导致记忆。方法适用于 Spring Boot,因此它可能是 Micronaut 中缺少的功能。

我在使用 Micronaut 时也发生了同样的缓冲 Writeable (blocking) responses and trying to flush data as it is written. I opened an issue about that to micronaut core