Spring WebFlux with MongoDB - 限制 SSE 客户端
Spring WebFlux with MongoDB - throttling SSE clients
我正在通过 Spring Boot 2.1.1 和 WebFlux、Reactor 3.2.3、Mongo 3.8.2 和 Netty 4.1.31 开发一个简单的聊天服务 运行。
每个聊天室都有 2 个集合 - 消息存档和包含当前事件(例如新消息事件、用户输入指示器等)的上限集合。上限集合有 100 个元素,我正在使用 ReactiveMongoTemplate 的 tail() 方法来检索最新事件。
该服务公开了两种用于检索最近事件的端点:SSE 和轮询。我对 2000 个并发用户进行了一些压力测试,除了收听聊天外,还发送大量事件。
观测值是:
- 每 2 秒轮询一次会给服务带来一点压力(测试期间使用率约为 40% CPU),而对 MongoDB (~4%) 几乎没有压力
- 通过 SSE 收听最大化了 MongoDB (~90%),也强调了服务(它试图使用剩余的可用资源),但 Mongo 特别挣扎和整体服务几乎没有响应。
观察似乎很明显,因为当我在测试期间通过 SSE 连接时,当新事件到达时它几乎立即更新了我 - 基本上 SSE 的响应速度比每 2 秒轮询快数百倍。
问题是:
鉴于客户端最终是订阅者(或者至少我认为这是由有限的知识给出的),我可以通过 ReactiveMongoTemplate 以某种方式限制发布消息的速率吗?或者以某种方式减少对新事件的需求而不必在客户端进行?
我一直在尝试使用 Flux 缓冲和缓存,但它造成了更大的压力...
代码:
// ChatRepository.java
private static final Query chatEventsQuery = new Query();
public Flux<ChatEvent> getChatEventsStream(String chatId) {
return reactiveMongoTemplate.tail(
chatEventsQuery,
ChatEvent.class,
chatId
);
}
,
// ChatHandler.java
public Mono<ServerResponse> getChatStream(ServerRequest request) {
String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
String username = getUsername(request);
Flux<ServerSentEvent> chatEventsStream = chatRepository
.getChatEventsStream(chatId)
.map(addUserSpecificPropsToChatEvent(username))
.map(event -> ServerSentEvent.<ChatEvent>builder()
.event(event.getType().getEventName())
.data(event)
.build());
log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);
return ServerResponse.ok().body(
chatEventsStream,
ServerSentEvent.class
);
}
,
// ChatRouter.java
RouterFunction<ServerResponse> routes(ChatHandler handler) {
return route(GET("/api/chat/{chatId}/stream"), handler::getChatStream);
}
答案是:
您可以使用 Flux.buffer
方法来完成。然后 flux 将以定义的速率将事件批量发送给订阅者。
我 post 编辑的代码有 2 个主要问题
鉴于多个用户通常都在收听一个聊天,我重构了 ChatRepository 以利用 "Hot"、可重放的 Fluxes(现在每个聊天我有 1 个流而不是 1 个流每个用户),我将其存储在 Caffeine 缓存中。
此外,我通过较短的时间间隔对它们进行缓冲,以避免在繁忙的聊天中将事件推送给客户端时占用大量资源。
我在 ChatRepository 中使用的 new Query()
是多余的。我
查看了 ReactiveMongoTemplate 的代码,如果是非空的
query 提供了,逻辑有点复杂。最好通过null
改为 ReactiveMongoTemplate 的 tail()
方法。
代码post-重构
// ChatRepository.java
public Flux<List<ChatEvent>> getChatEventsStream(String chatId) {
return Optional.ofNullable(chatStreamsCache.getIfPresent(chatId))
.orElseGet(newCachedChatEventsStream(chatId))
.autoConnect();
}
private Supplier<ConnectableFlux<List<ChatEvent>>> newCachedChatEventsStream(String chatId) {
return () -> {
ConnectableFlux<List<ChatEvent>> chatEventsStream = reactiveMongoTemplate.tail(
null,
ChatEvent.class,
chatId
).buffer(Duration.ofMillis(chatEventsBufferInterval))
.replay(chatEventsReplayCount);
chatStreamsCache.put(chatId, chatEventsStream);
return chatEventsStream;
};
}
,
// ChatHandler.java
public Mono<ServerResponse> getChatStream(ServerRequest request) {
String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
String username = getUsername(request);
Flux<ServerSentEvent> chatEventsStream = chatRepository
.getChatEventsStream(chatId)
.map(addUserSpecificPropsToChatEvents(username))
.map(event -> ServerSentEvent.<List<ChatEvent>>builder()
.event(CHAT_SSE_NAME)
.data(event)
.build());
log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);
return ServerResponse.ok().body(
chatEventsStream,
ServerSentEvent.class
);
}
,
应用这些更改后,即使有 3000 个活跃用户,该服务也表现良好(JVM 使用了 ~50% CPU,Mongo ~7% 主要是由于大量插入 - 流是现在没那么明显了)
我正在通过 Spring Boot 2.1.1 和 WebFlux、Reactor 3.2.3、Mongo 3.8.2 和 Netty 4.1.31 开发一个简单的聊天服务 运行。
每个聊天室都有 2 个集合 - 消息存档和包含当前事件(例如新消息事件、用户输入指示器等)的上限集合。上限集合有 100 个元素,我正在使用 ReactiveMongoTemplate 的 tail() 方法来检索最新事件。
该服务公开了两种用于检索最近事件的端点:SSE 和轮询。我对 2000 个并发用户进行了一些压力测试,除了收听聊天外,还发送大量事件。
观测值是:
- 每 2 秒轮询一次会给服务带来一点压力(测试期间使用率约为 40% CPU),而对 MongoDB (~4%) 几乎没有压力
- 通过 SSE 收听最大化了 MongoDB (~90%),也强调了服务(它试图使用剩余的可用资源),但 Mongo 特别挣扎和整体服务几乎没有响应。
观察似乎很明显,因为当我在测试期间通过 SSE 连接时,当新事件到达时它几乎立即更新了我 - 基本上 SSE 的响应速度比每 2 秒轮询快数百倍。
问题是:
鉴于客户端最终是订阅者(或者至少我认为这是由有限的知识给出的),我可以通过 ReactiveMongoTemplate 以某种方式限制发布消息的速率吗?或者以某种方式减少对新事件的需求而不必在客户端进行?
我一直在尝试使用 Flux 缓冲和缓存,但它造成了更大的压力...
代码:
// ChatRepository.java
private static final Query chatEventsQuery = new Query();
public Flux<ChatEvent> getChatEventsStream(String chatId) {
return reactiveMongoTemplate.tail(
chatEventsQuery,
ChatEvent.class,
chatId
);
}
,
// ChatHandler.java
public Mono<ServerResponse> getChatStream(ServerRequest request) {
String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
String username = getUsername(request);
Flux<ServerSentEvent> chatEventsStream = chatRepository
.getChatEventsStream(chatId)
.map(addUserSpecificPropsToChatEvent(username))
.map(event -> ServerSentEvent.<ChatEvent>builder()
.event(event.getType().getEventName())
.data(event)
.build());
log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);
return ServerResponse.ok().body(
chatEventsStream,
ServerSentEvent.class
);
}
,
// ChatRouter.java
RouterFunction<ServerResponse> routes(ChatHandler handler) {
return route(GET("/api/chat/{chatId}/stream"), handler::getChatStream);
}
答案是:
您可以使用 Flux.buffer
方法来完成。然后 flux 将以定义的速率将事件批量发送给订阅者。
我 post 编辑的代码有 2 个主要问题
鉴于多个用户通常都在收听一个聊天,我重构了 ChatRepository 以利用 "Hot"、可重放的 Fluxes(现在每个聊天我有 1 个流而不是 1 个流每个用户),我将其存储在 Caffeine 缓存中。 此外,我通过较短的时间间隔对它们进行缓冲,以避免在繁忙的聊天中将事件推送给客户端时占用大量资源。
我在 ChatRepository 中使用的
new Query()
是多余的。我 查看了 ReactiveMongoTemplate 的代码,如果是非空的 query 提供了,逻辑有点复杂。最好通过null
改为 ReactiveMongoTemplate 的tail()
方法。
代码post-重构
// ChatRepository.java
public Flux<List<ChatEvent>> getChatEventsStream(String chatId) {
return Optional.ofNullable(chatStreamsCache.getIfPresent(chatId))
.orElseGet(newCachedChatEventsStream(chatId))
.autoConnect();
}
private Supplier<ConnectableFlux<List<ChatEvent>>> newCachedChatEventsStream(String chatId) {
return () -> {
ConnectableFlux<List<ChatEvent>> chatEventsStream = reactiveMongoTemplate.tail(
null,
ChatEvent.class,
chatId
).buffer(Duration.ofMillis(chatEventsBufferInterval))
.replay(chatEventsReplayCount);
chatStreamsCache.put(chatId, chatEventsStream);
return chatEventsStream;
};
}
,
// ChatHandler.java
public Mono<ServerResponse> getChatStream(ServerRequest request) {
String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
String username = getUsername(request);
Flux<ServerSentEvent> chatEventsStream = chatRepository
.getChatEventsStream(chatId)
.map(addUserSpecificPropsToChatEvents(username))
.map(event -> ServerSentEvent.<List<ChatEvent>>builder()
.event(CHAT_SSE_NAME)
.data(event)
.build());
log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);
return ServerResponse.ok().body(
chatEventsStream,
ServerSentEvent.class
);
}
,
应用这些更改后,即使有 3000 个活跃用户,该服务也表现良好(JVM 使用了 ~50% CPU,Mongo ~7% 主要是由于大量插入 - 流是现在没那么明显了)