gpt4 book ai didi

go - 客户端同步中的竞争条件

转载 作者:IT王子 更新时间:2023-10-29 02:09:24 25 4
gpt4 key购买 nike

我有一个网络应用程序,其服务器为每个 websocket 连接创建一个客户端。 Client 充当 websocket 连接和 Hub 的单个实例之间的中介。 Hub 维护一组已注册的客户端并向客户端广播消息。这工作得很好,但问题是客户端可能会错过服务器生成客户端在连接时接收的初始状态包与客户端注册到集线器并开始接收广播事件之间的事件。

我的想法是在从数据库中获取任何信息之前向集线器注册客户端。这将确保客户端不会错过任何广播,尽管现在它可以接收到已经应用于它接收到的初始状态的消息。为了让客户端忽略这些消息,我可以在初始状态包和广播事件中包含一个单调时间戳。

你能想到一个更优雅/更简单的解决方案吗?

最佳答案

我过去曾使用预写日志来做这样的事情。简而言之,在集线器中保留消息的环形缓冲区。然后重播在初始化新客户端时发送给现有客户端的消息。

如果您愿意,您也可以向客户展示这个概念。这样你就可以实现高效的重新连接(特别适合移动连接)。当客户端断开 websocket 连接时,他们可以重新连接并说“嘿,又是我。看起来我们被打扰了。我看到的最后一条消息是 42 号。有什么新消息吗?”

以下是凭内存,所以仅作为思路的说明,并非完成的实现。例如,为了简洁起见,我省略了围绕 client.send 的选择语句。

package main

import (
"container/list"
"sync"

"github.com/gorilla/websocket"
)

type Client struct { // all unchanged
hub *Hub
conn *websocket.Conn
send chan []byte
}

type Hub struct {
mu *sync.RWMutex
wal list.List // List if recent messages
clients map[*Client]bool // Registered clients.

register chan Registration // not a chan *Client anymore

broadcast chan []byte
unregister chan *Client
}

type Registration struct {
client *Client

// init is a function that is executed before the client starts to receive
// broadcast messages. All messages that are broadcast while init is
// running will be sent after init returns.
init func()
}

func (h *Hub) run() {
for {
select {
case reg := <-h.register:
// Take note of the most recent message as of right now.
// initClient will replay all later messages
h.mu.RLock()
head := h.wal.Back()
h.mu.RUnlock()

go h.initClient(reg, head)
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
h.mu.Unlock()
case message := <-h.broadcast:
h.mu.Lock()
h.wal.PushBack(message)
// TODO: Trim list if too long by some metric (e.g. number of
// messages, age, total message size, etc.)

clients := make([]*Client, len(h.clients))
copy(clients, h.clients)
h.mu.Unlock()

for client := range clients {
// TODO: deal with backpressure
client.send <- message
}
}
}
}

func (h *Hub) initClient(reg Registration, head *list.Element) {
reg.init()

// send messages in h.wal after head
for {
h.mu.RLock()
head = head.Next()
if head == nil {
// caught up
h.clients[reg.client] = true
h.mu.RUnlock()
return
}
h.mu.RUnlock()

// TODO: deal with backpressure
reg.client.send <- head.Value.([]byte)
}
}

关于go - 客户端同步中的竞争条件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51756075/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com