gpt4 book ai didi

go - 如何使用 channel 广播消息

转载 作者:IT老高 更新时间:2023-10-28 13:04:00 26 4
gpt4 key购买 nike

我是新手,我正在尝试创建一个简单的聊天服务器,客户端可以在其中向所有连接的客户端广播消息。

在我的服务器中,我有一个接受连接的 goroutine(无限循环),并且所有连接都由 channel 接收。

go func() {
for {
conn, _ := listener.Accept()
ch <- conn
}
}()

然后,我为每个连接的客户端启动一个处理程序(goroutine)。在处理程序内部,我尝试通过迭代 channel 来广播到所有连接。

for c := range ch {
conn.Write(msg)
}

但是,我无法广播,因为(我认为通过阅读文档) channel 需要在迭代之前关闭。我不确定何时应该关闭 channel ,因为我想不断接受新的连接,而关闭 channel 不会让我这样做。如果有人可以帮助我,或者提供一种更好的方式向所有连接的客户端广播消息,将不胜感激。

最佳答案

你正在做的是一个扇出模式,也就是说,多个端点正在监听一个输入源。这种模式的结果是,只要输入源中有消息,这些监听器中只有一个能够获取消息。唯一的异常(exception)是 channel 的 close。此 close 将被所有听众识别,因此是“广播”。

但是你想做的是广播一条从连接中读取的消息,所以我们可以这样做:

当听众人数已知时

让每个worker监听专用的广播 channel ,并将消息从主 channel 分发到每个专用的广播 channel 。

type worker struct {
source chan interface{}
quit chan struct{}
}

func (w *worker) Start() {
w.source = make(chan interface{}, 10) // some buffer size to avoid blocking
go func() {
for {
select {
case msg := <-w.source
// do something with msg
case <-quit: // will explain this in the last section
return
}
}
}()
}

然后我们可以有一堆 worker :

workers := []*worker{&worker{}, &worker{}}
for _, worker := range workers { worker.Start() }

然后启动我们的监听器:

go func() {
for {
conn, _ := listener.Accept()
ch <- conn
}
}()

还有一个调度员:

go func() {
for {
msg := <- ch
for _, worker := workers {
worker.source <- msg
}
}
}()

当听众人数未知时

在这种情况下,上面给出的解决方案仍然有效。唯一的区别是,当你需要一个新的worker时,你需要创建一个新的worker,启动它,然后将它插入workers slice。但是这种方法需要一个线程安全的 slice ,它需要一个锁。其中一种实现可能如下所示:

type threadSafeSlice struct {
sync.Mutex
workers []*worker
}

func (slice *threadSafeSlice) Push(w *worker) {
slice.Lock()
defer slice.Unlock()

workers = append(workers, w)
}

func (slice *threadSafeSlice) Iter(routine func(*worker)) {
slice.Lock()
defer slice.Unlock()

for _, worker := range workers {
routine(worker)
}
}

无论何时你想启动一个 worker :

w := &worker{}
w.Start()
threadSafeSlice.Push(w)

您的调度员将更改为:

go func() {
for {
msg := <- ch
threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
}
}()

遗言:永远不要离开一个悬空的 goroutine

其中一个好的做法是:永远不要离开悬空的 goroutine。所以当你听完之后,你需要关闭所有你触发的 goroutine。这将通过 worker 中的 quit channel 完成:

首先我们需要创建一个全局的quit信号 channel :

globalQuit := make(chan struct{})

每当我们创建一个worker时,我们都会将globalQuit channel 分配给它作为它的退出信号:

worker.quit = globalQuit

然后,当我们想要关闭所有工作人员时,我们只需这样做:

close(globalQuit)

由于 close 会被所有监听的 goroutine 识别(这是你理解的一点),所有的 goroutine 都会被返回。记得关闭你的调度程序例程,但我会把它留给你:)

关于go - 如何使用 channel 广播消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36417199/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com