使用 Undertow WebSockets 高效地发送大数据集
Send large data set using Undertow WebSockets efficiently
我有一个很大的 ConcurrentHashMap (cache.getCache()
),我在其中保存我的所有数据(大约 500+ MB 大小,但它会随着时间的推移而增长)。客户端可以通过使用纯 java HttpServer 实现的 API 访问它。
这是简化的代码:
JsonWriter jsonWriter = new JsonWriter(new OutputStreamWriter(new BufferedOutputStream(new GZIPOutputStream(exchange.getResponseBody())))));
new GsonBuilder().create().toJson(cache.getCache(), CacheContainer.class, jsonWriter);
还有一些客户端发送的过滤器,因此它们实际上并不是每次都获取所有数据,但是 HashMap 会不断更新,因此客户端必须经常刷新才能获得最新数据。这是低效的,所以我决定使用 WebSockets 将数据更新实时推送到客户端。
我为此选择了 Undertow,因为我可以简单地从 Maven 中导入它,而无需在服务器上进行额外的配置。
在 WS 连接上,我将通道添加到 HashSet 并发送整个数据集(客户端在获取初始数据之前发送带有一些过滤器的消息,但我从示例中删除了这部分):
public class MyConnectionCallback implements WebSocketConnectionCallback {
CacheContainer cache;
Set<WebSocketChannel> clients = new HashSet<>();
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
public MyConnectionCallback(CacheContainer cache) {
this.cache = cache;
Thread pusherThread = new Thread(() -> {
while (true) {
push(queue.take());
}
});
pusherThread.start();
}
public void onConnect(WebSocketHttpExchange webSocketHttpExchange, WebSocketChannel webSocketChannel) {
webSocketChannel.getReceiveSetter().set(new AbstractReceiveListener() {
protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) {
clients.add(webSocketChannel);
WebSockets.sendText(gson.toJson(cache.getCache()), webSocketChannel, null);
}
}
}
private void push(String message) {
Set<WebSocketChannel> closed = new HashSet<>();
clients.forEach((webSocketChannel) -> {
if (webSocketChannel.isOpen()) {
WebSockets.sendText(message, webSocketChannel, null);
} else {
closed.add(webSocketChannel);
}
}
closed.foreach(clients::remove);
}
public void putMessage(String message) {
queue.put(message);
}
}
每次更改我的缓存后,我都会获取新值并将其放入队列(我不直接序列化 myUpdate
对象,因为 updateCache 方法背后还有其他逻辑)。只有一个线程负责更新缓存:
cache.updateCache(key, myUpdate);
Map<Key,Value> tempMap = new HashMap<>();
tempMap.put(key, cache.getValue(key));
webSocketServer.putMessage(gson.toJson(tempMap));
我发现这种方法存在的问题:
- 在初始连接时,整个数据集被转换为一个字符串,我担心太多的请求会导致服务器变成 OOM。 WebSockets.sendText 只接受 String 和 ByteBuffer
- 如果我先将通道添加到客户端集,然后再发送数据,在发送初始数据之前,推送可能会到达客户端,客户端将处于无效状态
- 如果我先发送初始数据,然后将通道添加到客户端集,则发送初始数据期间到来的推送消息将丢失,客户端将处于无效状态
我为问题 #2 和 #3 提出的解决方案是将消息放入队列(我会将 Set<WebSocketChannel>
转换为 Map<WebSocketChannel,Queue<String>>
并仅在队列中发送消息客户收到初始数据集后,但我欢迎任何其他建议。
至于问题 #1,我的问题是通过 WebSocket 发送初始数据的最有效方式是什么?例如,使用 JsonWriter 直接写入 WebSocket。
我意识到客户端可以使用 API 进行初始调用并订阅 WebSocket 以进行更改,但这种方法使客户端负责拥有正确的状态(他们需要订阅 WS、队列WS 消息,使用 API 获取初始数据,然后在获取初始数据后将排队的 WS 消息应用于他们的数据集)我不想将控制权留给他们,因为数据是敏感的。
看来#2 和#3 的问题与不同线程能够同时向客户端发送数据状态有关。所以除了你的方法之外,你还可以考虑另外两种同步方法。
- 使用互斥锁来保护对数据和客户端发送的访问。这将数据读取和发送序列化到客户端,因此(伪)代码变为:
protected void onFullTextMessage(...) {
LOCK {
clients.add(webSocketChannel);
WebSockets.sendText(gson.toJson(cache.getCache()), webSocketChannel, null);
}
}
void push(String message) {
Set<WebSocketChannel> closed = new HashSet<>();
LOCK {
clients.forEach((webSocketChannel) -> {
if (webSocketChannel.isOpen()) {
WebSockets.sendText(message, webSocketChannel, null);
} else {
closed.add(webSocketChannel);
}
}
}
closed.foreach(clients::remove);
}
- 创建一个新的 class 和服务线程,它全权负责管理对数据缓存的更改并将这些更改推送给客户端;它将使用内部同步队列来异步处理方法调用,并跟踪已连接的客户端,它将具有如下接口:
public void update_cache(....);
public void add_new_client(WebSocketChannel);
... 这些调用中的每一个都查询一个要在对象内部线程上完成的操作。这保证了初始快照和更新的顺序,因为只有一个线程执行更改缓存并将这些更改传播给订阅者的工作。
至于 #1,如果您使用方法 #2,那么您可以缓存数据的序列化状态,允许在以后的快照中重用(前提是它同时没有被更改)。如评论中所述:这仅在以后的客户端具有相同的过滤器配置时才有效。
为了解决问题 #2 和 #3,我在每个客户端上设置了一个推锁标志,只有在发送初始数据时才会解锁。设置推锁后,到达的消息将放置在该客户端队列中。然后在任何新消息之前发送排队的消息。
我通过直接使用 ByteBuffer 而不是 String 来缓解问题 #1。这样我可以节省一些内存因为编码(字符串默认使用UTF-16)
最终代码:
public class WebSocketClient {
private boolean pushLock;
private Gson gson;
private Queue<CacheContainer> queue = new ConcurrentLinkedQueue<>();
WebSocketClient(MyQuery query, CacheHandler cacheHandler) {
pushLock = true;
this.gson = GsonFactory.getGson(query, cacheHandler);
}
public synchronized boolean isPushLock() {
return pushLock;
}
public synchronized void pushUnlock() {
pushLock = false;
}
public Gson getGson() {
return gson;
}
public Queue<CacheContainer> getQueue() {
return queue;
}
public boolean hasBackLog() {
return !queue.isEmpty();
}
}
public class MyConnectionCallback implements WebSocketConnectionCallback {
private final Map<WebSocketChannel, WebSocketClient> clients = new ConcurrentHashMap<>();
private final BlockingQueue<CacheContainer> messageQueue = new LinkedBlockingQueue<>();
private final Gson queryGson = new GsonBuilder().disableHtmlEscaping().create();
private final CacheHandler cacheHandler;
MyConnectionCallback(CacheHandler cacheHandler) {
this.cacheHandler = cacheHandler;
Thread pusherThread = new Thread(() -> {
boolean hasPushLock = false;
while (true) {
if (messageQueue.isEmpty() && hasPushLock) hasPushLock = pushToAllClients(null);
else hasPushLock = pushToAllClients(messageQueue.take());
}
}, "PusherThread");
pusherThread.start();
}
@Override
public void onConnect(WebSocketHttpExchange webSocketHttpExchange, WebSocketChannel webSocketChannel) {
webSocketChannel.getReceiveSetter().set(new AbstractReceiveListener() {
@Override
protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) throws IOException {
MyQuery query = new MyQuery(queryGson.fromJson(message.getData(), QueryJson.class));
WebSocketClient clientConfig = new WebSocketClient(query, cacheHandler);
clients.put(webSocketChannel, clientConfig);
push(webSocketChannel, clientConfig.getGson(), cacheHandler.getCache());
clientConfig.pushUnlock();
}
});
webSocketChannel.resumeReceives();
}
void putMessage(CacheContainer message) {
messageQueue.put(message);
}
private synchronized void push(WebSocketChannel webSocketChannel, Gson gson, CacheContainer message) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
JsonWriter jsonWriter = new JsonWriter(new OutputStreamWriter(baos, StandardCharsets.UTF_8))) {
gson.toJson(message, CacheContainer.class, jsonWriter);
jsonWriter.flush();
if (baos.size() > 2) {
WebSockets.sendText(ByteBuffer.wrap(baos.toByteArray()), webSocketChannel, null);
}
}
}
private synchronized boolean pushToAllClients(CacheContainer message) {
AtomicBoolean hadPushLock = new AtomicBoolean(false);
Set<WebSocketChannel> closed = new HashSet<>();
clients.forEach((webSocketChannel, clientConfig) -> {
if (webSocketChannel.isOpen()) {
if (clientConfig.isPushLock()) {
hadPushLock.set(true);
clientConfig.getQueue().add(message);
} else {
try {
if (clientConfig.hasBackLog())
pushBackLog(webSocketChannel, clientConfig);
if (message != null)
push(webSocketChannel, clientConfig.getGson(), message);
} catch (Exception e) {
closeChannel(webSocketChannel, closed);
}
}
} else {
closed.add(webSocketChannel);
}
});
closed.forEach(clients::remove);
return hadPushLock.get();
}
private void pushBackLog(WebSocketChannel webSocketChannel, WebSocketClient clientConfig) throws IOException {
while (clientConfig.hasBackLog()) {
push(webSocketChannel, clientConfig.getGson(), clientConfig.getQueue().poll());
}
}
private void closeChannel(WebSocketChannel channel, Set<WebSocketChannel> closed) {
closed.add(channel);
channel.close();
}
}
我有一个很大的 ConcurrentHashMap (cache.getCache()
),我在其中保存我的所有数据(大约 500+ MB 大小,但它会随着时间的推移而增长)。客户端可以通过使用纯 java HttpServer 实现的 API 访问它。
这是简化的代码:
JsonWriter jsonWriter = new JsonWriter(new OutputStreamWriter(new BufferedOutputStream(new GZIPOutputStream(exchange.getResponseBody())))));
new GsonBuilder().create().toJson(cache.getCache(), CacheContainer.class, jsonWriter);
还有一些客户端发送的过滤器,因此它们实际上并不是每次都获取所有数据,但是 HashMap 会不断更新,因此客户端必须经常刷新才能获得最新数据。这是低效的,所以我决定使用 WebSockets 将数据更新实时推送到客户端。
我为此选择了 Undertow,因为我可以简单地从 Maven 中导入它,而无需在服务器上进行额外的配置。
在 WS 连接上,我将通道添加到 HashSet 并发送整个数据集(客户端在获取初始数据之前发送带有一些过滤器的消息,但我从示例中删除了这部分):
public class MyConnectionCallback implements WebSocketConnectionCallback {
CacheContainer cache;
Set<WebSocketChannel> clients = new HashSet<>();
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
public MyConnectionCallback(CacheContainer cache) {
this.cache = cache;
Thread pusherThread = new Thread(() -> {
while (true) {
push(queue.take());
}
});
pusherThread.start();
}
public void onConnect(WebSocketHttpExchange webSocketHttpExchange, WebSocketChannel webSocketChannel) {
webSocketChannel.getReceiveSetter().set(new AbstractReceiveListener() {
protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) {
clients.add(webSocketChannel);
WebSockets.sendText(gson.toJson(cache.getCache()), webSocketChannel, null);
}
}
}
private void push(String message) {
Set<WebSocketChannel> closed = new HashSet<>();
clients.forEach((webSocketChannel) -> {
if (webSocketChannel.isOpen()) {
WebSockets.sendText(message, webSocketChannel, null);
} else {
closed.add(webSocketChannel);
}
}
closed.foreach(clients::remove);
}
public void putMessage(String message) {
queue.put(message);
}
}
每次更改我的缓存后,我都会获取新值并将其放入队列(我不直接序列化 myUpdate
对象,因为 updateCache 方法背后还有其他逻辑)。只有一个线程负责更新缓存:
cache.updateCache(key, myUpdate);
Map<Key,Value> tempMap = new HashMap<>();
tempMap.put(key, cache.getValue(key));
webSocketServer.putMessage(gson.toJson(tempMap));
我发现这种方法存在的问题:
- 在初始连接时,整个数据集被转换为一个字符串,我担心太多的请求会导致服务器变成 OOM。 WebSockets.sendText 只接受 String 和 ByteBuffer
- 如果我先将通道添加到客户端集,然后再发送数据,在发送初始数据之前,推送可能会到达客户端,客户端将处于无效状态
- 如果我先发送初始数据,然后将通道添加到客户端集,则发送初始数据期间到来的推送消息将丢失,客户端将处于无效状态
我为问题 #2 和 #3 提出的解决方案是将消息放入队列(我会将 Set<WebSocketChannel>
转换为 Map<WebSocketChannel,Queue<String>>
并仅在队列中发送消息客户收到初始数据集后,但我欢迎任何其他建议。
至于问题 #1,我的问题是通过 WebSocket 发送初始数据的最有效方式是什么?例如,使用 JsonWriter 直接写入 WebSocket。
我意识到客户端可以使用 API 进行初始调用并订阅 WebSocket 以进行更改,但这种方法使客户端负责拥有正确的状态(他们需要订阅 WS、队列WS 消息,使用 API 获取初始数据,然后在获取初始数据后将排队的 WS 消息应用于他们的数据集)我不想将控制权留给他们,因为数据是敏感的。
看来#2 和#3 的问题与不同线程能够同时向客户端发送数据状态有关。所以除了你的方法之外,你还可以考虑另外两种同步方法。
- 使用互斥锁来保护对数据和客户端发送的访问。这将数据读取和发送序列化到客户端,因此(伪)代码变为:
protected void onFullTextMessage(...) {
LOCK {
clients.add(webSocketChannel);
WebSockets.sendText(gson.toJson(cache.getCache()), webSocketChannel, null);
}
}
void push(String message) {
Set<WebSocketChannel> closed = new HashSet<>();
LOCK {
clients.forEach((webSocketChannel) -> {
if (webSocketChannel.isOpen()) {
WebSockets.sendText(message, webSocketChannel, null);
} else {
closed.add(webSocketChannel);
}
}
}
closed.foreach(clients::remove);
}
- 创建一个新的 class 和服务线程,它全权负责管理对数据缓存的更改并将这些更改推送给客户端;它将使用内部同步队列来异步处理方法调用,并跟踪已连接的客户端,它将具有如下接口:
public void update_cache(....);
public void add_new_client(WebSocketChannel);
... 这些调用中的每一个都查询一个要在对象内部线程上完成的操作。这保证了初始快照和更新的顺序,因为只有一个线程执行更改缓存并将这些更改传播给订阅者的工作。
至于 #1,如果您使用方法 #2,那么您可以缓存数据的序列化状态,允许在以后的快照中重用(前提是它同时没有被更改)。如评论中所述:这仅在以后的客户端具有相同的过滤器配置时才有效。
为了解决问题 #2 和 #3,我在每个客户端上设置了一个推锁标志,只有在发送初始数据时才会解锁。设置推锁后,到达的消息将放置在该客户端队列中。然后在任何新消息之前发送排队的消息。
我通过直接使用 ByteBuffer 而不是 String 来缓解问题 #1。这样我可以节省一些内存因为编码(字符串默认使用UTF-16)
最终代码:
public class WebSocketClient {
private boolean pushLock;
private Gson gson;
private Queue<CacheContainer> queue = new ConcurrentLinkedQueue<>();
WebSocketClient(MyQuery query, CacheHandler cacheHandler) {
pushLock = true;
this.gson = GsonFactory.getGson(query, cacheHandler);
}
public synchronized boolean isPushLock() {
return pushLock;
}
public synchronized void pushUnlock() {
pushLock = false;
}
public Gson getGson() {
return gson;
}
public Queue<CacheContainer> getQueue() {
return queue;
}
public boolean hasBackLog() {
return !queue.isEmpty();
}
}
public class MyConnectionCallback implements WebSocketConnectionCallback {
private final Map<WebSocketChannel, WebSocketClient> clients = new ConcurrentHashMap<>();
private final BlockingQueue<CacheContainer> messageQueue = new LinkedBlockingQueue<>();
private final Gson queryGson = new GsonBuilder().disableHtmlEscaping().create();
private final CacheHandler cacheHandler;
MyConnectionCallback(CacheHandler cacheHandler) {
this.cacheHandler = cacheHandler;
Thread pusherThread = new Thread(() -> {
boolean hasPushLock = false;
while (true) {
if (messageQueue.isEmpty() && hasPushLock) hasPushLock = pushToAllClients(null);
else hasPushLock = pushToAllClients(messageQueue.take());
}
}, "PusherThread");
pusherThread.start();
}
@Override
public void onConnect(WebSocketHttpExchange webSocketHttpExchange, WebSocketChannel webSocketChannel) {
webSocketChannel.getReceiveSetter().set(new AbstractReceiveListener() {
@Override
protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) throws IOException {
MyQuery query = new MyQuery(queryGson.fromJson(message.getData(), QueryJson.class));
WebSocketClient clientConfig = new WebSocketClient(query, cacheHandler);
clients.put(webSocketChannel, clientConfig);
push(webSocketChannel, clientConfig.getGson(), cacheHandler.getCache());
clientConfig.pushUnlock();
}
});
webSocketChannel.resumeReceives();
}
void putMessage(CacheContainer message) {
messageQueue.put(message);
}
private synchronized void push(WebSocketChannel webSocketChannel, Gson gson, CacheContainer message) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
JsonWriter jsonWriter = new JsonWriter(new OutputStreamWriter(baos, StandardCharsets.UTF_8))) {
gson.toJson(message, CacheContainer.class, jsonWriter);
jsonWriter.flush();
if (baos.size() > 2) {
WebSockets.sendText(ByteBuffer.wrap(baos.toByteArray()), webSocketChannel, null);
}
}
}
private synchronized boolean pushToAllClients(CacheContainer message) {
AtomicBoolean hadPushLock = new AtomicBoolean(false);
Set<WebSocketChannel> closed = new HashSet<>();
clients.forEach((webSocketChannel, clientConfig) -> {
if (webSocketChannel.isOpen()) {
if (clientConfig.isPushLock()) {
hadPushLock.set(true);
clientConfig.getQueue().add(message);
} else {
try {
if (clientConfig.hasBackLog())
pushBackLog(webSocketChannel, clientConfig);
if (message != null)
push(webSocketChannel, clientConfig.getGson(), message);
} catch (Exception e) {
closeChannel(webSocketChannel, closed);
}
}
} else {
closed.add(webSocketChannel);
}
});
closed.forEach(clients::remove);
return hadPushLock.get();
}
private void pushBackLog(WebSocketChannel webSocketChannel, WebSocketClient clientConfig) throws IOException {
while (clientConfig.hasBackLog()) {
push(webSocketChannel, clientConfig.getGson(), clientConfig.getQueue().poll());
}
}
private void closeChannel(WebSocketChannel channel, Set<WebSocketChannel> closed) {
closed.add(channel);
channel.close();
}
}