Spring WebFlux with MongoDB - 限制 SSE 客户端

Spring WebFlux with MongoDB - throttling SSE clients

我正在通过 Spring Boot 2.1.1 和 WebFlux、Reactor 3.2.3、Mongo 3.8.2 和 Netty 4.1.31 开发一个简单的聊天服务 运行。

每个聊天室都有 2 个集合 - 消息存档和包含当前事件(例如新消息事件、用户输入指示器等)的上限集合。上限集合有 100 个元素,我正在使用 ReactiveMongoTemplate 的 tail() 方法来检索最新事件。

该服务公开了两种用于检索最近事件的端点:SSE 和轮询。我对 2000 个并发用户进行了一些压力测试,除了收听聊天外,还发送大量事件。

观测值是:

观察似乎很明显,因为当我在测试期间通过 SSE 连接时,当新事件到达时它几乎立即更新了我 - 基本上 SSE 的响应速度比每 2 秒轮询快数百倍。

问题是:

鉴于客户端最终是订阅者(或者至少我认为这是由有限的知识给出的),我可以通过 ReactiveMongoTemplate 以某种方式限制发布消息的速率吗?或者以某种方式减少对新事件的需求而不必在客户端进行?

我一直在尝试使用 Flux 缓冲和缓存,但它造成了更大的压力...

代码:

// ChatRepository.java

private static final Query chatEventsQuery = new Query();

public Flux<ChatEvent> getChatEventsStream(String chatId) {
    return reactiveMongoTemplate.tail(
            chatEventsQuery,
            ChatEvent.class,
            chatId
    );
}

,

// ChatHandler.java

public Mono<ServerResponse> getChatStream(ServerRequest request) {

    String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
    String username = getUsername(request);

    Flux<ServerSentEvent> chatEventsStream = chatRepository
            .getChatEventsStream(chatId)
            .map(addUserSpecificPropsToChatEvent(username))
            .map(event -> ServerSentEvent.<ChatEvent>builder()
                    .event(event.getType().getEventName())
                    .data(event)
                    .build());

    log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);

    return ServerResponse.ok().body(
            chatEventsStream,
            ServerSentEvent.class
    );
}

,

// ChatRouter.java

RouterFunction<ServerResponse> routes(ChatHandler handler) {
    return route(GET("/api/chat/{chatId}/stream"), handler::getChatStream);
}

答案是: 您可以使用 Flux.buffer 方法来完成。然后 flux 将以定义的速率将事件批​​量发送给订阅者。

我 post 编辑的代码有 2 个主要问题

  1. 鉴于多个用户通常都在收听一个聊天,我重构了 ChatRepository 以利用 "Hot"、可重放的 Fluxes(现在每个聊天我有 1 个流而不是 1 个流每个用户),我将其存储在 Caffeine 缓存中。 此外,我通过较短的时间间隔对它们进行缓冲,以避免在繁忙的聊天中将事件推送给客户端时占用大量资源。

  2. 我在 ChatRepository 中使用的 new Query() 是多余的。我 查看了 ReactiveMongoTemplate 的代码,如果是非空的 query 提供了,逻辑有点复杂。最好通过null 改为 ReactiveMongoTemplate 的 tail() 方法。

代码post-重构

// ChatRepository.java

public Flux<List<ChatEvent>> getChatEventsStream(String chatId) {
    return Optional.ofNullable(chatStreamsCache.getIfPresent(chatId))
            .orElseGet(newCachedChatEventsStream(chatId))
            .autoConnect();
}

private Supplier<ConnectableFlux<List<ChatEvent>>> newCachedChatEventsStream(String chatId) {
    return () -> {
        ConnectableFlux<List<ChatEvent>> chatEventsStream = reactiveMongoTemplate.tail(
                null,
                ChatEvent.class,
                chatId
        ).buffer(Duration.ofMillis(chatEventsBufferInterval))
                .replay(chatEventsReplayCount);

        chatStreamsCache.put(chatId, chatEventsStream);

        return chatEventsStream;
    };
}

,

// ChatHandler.java

public Mono<ServerResponse> getChatStream(ServerRequest request) {

    String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
    String username = getUsername(request);

    Flux<ServerSentEvent> chatEventsStream = chatRepository
            .getChatEventsStream(chatId)
            .map(addUserSpecificPropsToChatEvents(username))
            .map(event -> ServerSentEvent.<List<ChatEvent>>builder()
                    .event(CHAT_SSE_NAME)
                    .data(event)
                    .build());

    log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);

    return ServerResponse.ok().body(
            chatEventsStream,
            ServerSentEvent.class
    );
}

,

应用这些更改后,即使有 3000 个活跃用户,该服务也表现良好(JVM 使用了 ~50% CPU,Mongo ~7% 主要是由于大量插入 - 流是现在没那么明显了)