gpt4 book ai didi

java - 使用 Undertow WebSockets 高效发送大数据集

转载 作者:行者123 更新时间:2023-12-02 09:20:32 24 4
gpt4 key购买 nike

我有一个大型 ConcurrentHashMap ( cache.getCache() ),其中保存了所有数据(大约 500+ MB 大小,但这可能会随着时间的推移而增长)。客户端可以通过使用普通 java HttpServer 实现的 API 来访问它。这是简化的代码:

JsonWriter jsonWriter = new JsonWriter(new OutputStreamWriter(new BufferedOutputStream(new GZIPOutputStream(exchange.getResponseBody())))));
new GsonBuilder().create().toJson(cache.getCache(), CacheContainer.class, jsonWriter);

还有一些客户端发送的过滤器,因此它们实际上不会每次都获取所有数据,但 HashMap 会不断更新,因此客户端必须经常刷新才能获得最新数据。这效率很低,所以我决定使用 WebSockets 将数据更新实时推送到客户端。

我为此选择了 Undertow,因为我可以简单地从 Maven 导入它,并且不需要在服务器上进行额外的配置。

在 WS 连接上,我将 channel 添加到 HashSet 并发送整个数据集(客户端在获取初始数据之前发送带有一些过滤器的消息,但我从示例中删除了这部分):

public class MyConnectionCallback implements WebSocketConnectionCallback {
CacheContainer cache;
Set<WebSocketChannel> clients = new HashSet<>();
BlockingQueue<String> queue = new LinkedBlockingQueue<>();

public MyConnectionCallback(CacheContainer cache) {
this.cache = cache;
Thread pusherThread = new Thread(() -> {
while (true) {
push(queue.take());
}
});
pusherThread.start();
}

public void onConnect(WebSocketHttpExchange webSocketHttpExchange, WebSocketChannel webSocketChannel) {
webSocketChannel.getReceiveSetter().set(new AbstractReceiveListener() {
protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) {
clients.add(webSocketChannel);
WebSockets.sendText(gson.toJson(cache.getCache()), webSocketChannel, null);
}
}
}

private void push(String message) {
Set<WebSocketChannel> closed = new HashSet<>();
clients.forEach((webSocketChannel) -> {
if (webSocketChannel.isOpen()) {
WebSockets.sendText(message, webSocketChannel, null);
} else {
closed.add(webSocketChannel);
}
}
closed.foreach(clients::remove);
}

public void putMessage(String message) {
queue.put(message);
}
}

每次对缓存进行更改后,我都会获取新值并将其放入队列中(我不直接序列化 myUpdate 对象,因为 updateCache 方法背后还有其他逻辑)。只有一个线程负责更新缓存:

cache.updateCache(key, myUpdate);
Map<Key,Value> tempMap = new HashMap<>();
tempMap.put(key, cache.getValue(key));
webSocketServer.putMessage(gson.toJson(tempMap));

我发现这种方法存在的问题:

  1. 在初始连接时,整个数据集被转换为字符串,我担心太多的请求可能会导致服务器 OOM。 WebSockets.sendText 仅接受 String 和 ByteBuffer
  2. 如果我先将 channel 添加到客户端集,然后发送数据,则在发送初始数据之前可能会向客户端推送,并且客户端将处于无效状态
  3. 如果我先发送初始数据,然后将 channel 添加到客户端集合中,则发送初始数据期间的推送消息将会丢失,客户端将处于无效状态

我针对问题 #2 和 #3 提出的解决方案是将消息放入队列中(我会将 Set<WebSocketChannel> 转换为 Map<WebSocketChannel,Queue<String>> 并仅在客户端收到初始消息后才将消息发送到队列中数据集,但我欢迎在这里提出任何其他建议。

对于问题#1,我的问题是通过 WebSocket 发送初始数据的最有效方法是什么?例如,使用 JsonWriter 直接写入 WebSocket。

我意识到客户端可以使用 API 进行初始调用并订阅 WebSocket 进行更改,但这种方法使客户端负责拥有正确的状态(它们需要订阅 WS、对 WS 消息进行排队、获取初始数据)使用 API,然后在获取初始数据后将排队的 WS 消息应用到其数据集),我不想将控制权留给他们,因为数据很敏感。

最佳答案

#2 和 #3 的问题似乎与不同线程能够同时向客户端发送数据状态有关。因此,除了您的方法之外,您还可以考虑其他两种同步方法。

  1. 使用互斥锁来保护对数据和客户端发送的访问。这会序列化读取数据并将其发送到客户端,因此(伪)代码变得如下:
protected void onFullTextMessage(...) {
LOCK {
clients.add(webSocketChannel);
WebSockets.sendText(gson.toJson(cache.getCache()), webSocketChannel, null);
}
}

void push(String message) {
Set<WebSocketChannel> closed = new HashSet<>();
LOCK {
clients.forEach((webSocketChannel) -> {
if (webSocketChannel.isOpen()) {
WebSockets.sendText(message, webSocketChannel, null);
} else {
closed.add(webSocketChannel);
}
}
}
closed.foreach(clients::remove);
}
  • 创建一个新的类和服务线程,专门负责管理数据缓存的更改并将这些更改推送到客户端;它将使用内部同步队列来异步处理方法调用,并跟踪连接的客户端,它将具有如下接口(interface):
  • public void update_cache(....);
    public void add_new_client(WebSocketChannel);

    ...每个调用都会排队要在对象内部线程上完成的操作。这保证了初始快照和更新的顺序,因为只有一个线程执行更改缓存并将这些更改传播给订阅者的工作。

    对于#1,如果您使用方法#2,那么您可以缓存数据的序列化状态,以便在以后的快照上重用(前提是它同时没有被更改)。正如评论中所述:只有当以后的客户端具有相同的过滤器配置时,这才有效。

    关于java - 使用 Undertow WebSockets 高效发送大数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58726602/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com