- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
我是新手,我正在尝试创建一个简单的聊天服务器,客户端可以在其中向所有连接的客户端广播消息。
在我的服务器中,我有一个接受连接的 goroutine(无限循环),并且所有连接都由 channel 接收。
go func() {
for {
conn, _ := listener.Accept()
ch <- conn
}
}()
然后,我为每个连接的客户端启动一个处理程序(goroutine)。在处理程序内部,我尝试通过迭代 channel 来广播到所有连接。
for c := range ch {
conn.Write(msg)
}
但是,我无法广播,因为(我认为通过阅读文档) channel 需要在迭代之前关闭。我不确定何时应该关闭 channel ,因为我想不断接受新的连接,而关闭 channel 不会让我这样做。如果有人可以帮助我,或者提供一种更好的方式向所有连接的客户端广播消息,将不胜感激。
最佳答案
你正在做的是一个扇出模式,也就是说,多个端点正在监听一个输入源。这种模式的结果是,只要输入源中有消息,这些监听器中只有一个能够获取消息。唯一的异常(exception)是 channel 的 close
。此 close
将被所有听众识别,因此是“广播”。
但是你想做的是广播一条从连接中读取的消息,所以我们可以这样做:
让每个worker监听专用的广播 channel ,并将消息从主 channel 分发到每个专用的广播 channel 。
type worker struct {
source chan interface{}
quit chan struct{}
}
func (w *worker) Start() {
w.source = make(chan interface{}, 10) // some buffer size to avoid blocking
go func() {
for {
select {
case msg := <-w.source
// do something with msg
case <-quit: // will explain this in the last section
return
}
}
}()
}
然后我们可以有一堆 worker :
workers := []*worker{&worker{}, &worker{}}
for _, worker := range workers { worker.Start() }
然后启动我们的监听器:
go func() {
for {
conn, _ := listener.Accept()
ch <- conn
}
}()
还有一个调度员:
go func() {
for {
msg := <- ch
for _, worker := workers {
worker.source <- msg
}
}
}()
在这种情况下,上面给出的解决方案仍然有效。唯一的区别是,当你需要一个新的worker时,你需要创建一个新的worker,启动它,然后将它插入workers
slice。但是这种方法需要一个线程安全的 slice ,它需要一个锁。其中一种实现可能如下所示:
type threadSafeSlice struct {
sync.Mutex
workers []*worker
}
func (slice *threadSafeSlice) Push(w *worker) {
slice.Lock()
defer slice.Unlock()
workers = append(workers, w)
}
func (slice *threadSafeSlice) Iter(routine func(*worker)) {
slice.Lock()
defer slice.Unlock()
for _, worker := range workers {
routine(worker)
}
}
无论何时你想启动一个 worker :
w := &worker{}
w.Start()
threadSafeSlice.Push(w)
您的调度员将更改为:
go func() {
for {
msg := <- ch
threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
}
}()
其中一个好的做法是:永远不要离开悬空的 goroutine。所以当你听完之后,你需要关闭所有你触发的 goroutine。这将通过 worker
中的 quit
channel 完成:
首先我们需要创建一个全局的quit
信号 channel :
globalQuit := make(chan struct{})
每当我们创建一个worker时,我们都会将globalQuit
channel 分配给它作为它的退出信号:
worker.quit = globalQuit
然后,当我们想要关闭所有工作人员时,我们只需这样做:
close(globalQuit)
由于 close
会被所有监听的 goroutine 识别(这是你理解的一点),所有的 goroutine 都会被返回。记得关闭你的调度程序例程,但我会把它留给你:)
关于go - 如何使用 channel 广播消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36417199/
我很好奇为什么以下不起作用。一般select用default:防止死锁,但在这种情况下不是: package main import "fmt" func main () { a := mak
我一生都无法弄清楚如何切换图像排序。图像以 (x,x,3) 格式读取,theano 要求它是 (3,x,x) 格式。我尝试更改顺序numpy.array([img[:,:,i] for i in ra
我正在向 libnl 发送单个 SSID 和频率进行扫描,但我得到了多个扫描结果以及我请求的 SSID 和频率,但我需要单个扫描结果(仅适用于请求的 SSID),如何实现这一点。请帮助我,我也发送了我
我是 Golang 的新手,但正在努力理解这门伟大的语言!请帮帮我.. 我有 2 个 channel 。 “进”和“出” channel in, out := make(chan Work),
例如我有这段代码: package main import ( "fmt" ) func main() { c1 := make(chan interface{}) close
我们使用以下调用来获取经过身份验证的用户的 ChannelID,它适用于大多数情况。一些 YouTube 用户将他们的 channel 连接到 Google+ 信息页,但在这种情况下,我们的一位用户无
case 'sinfo': const sinfo = new Discord.MessageEmbed() .addField('Server Name 🔎 :', message.guild.n
我需要让所有 channel 来创建一个 bunker 命令,这使得所有 channel 都是只读的。 最佳答案 他们变了Client.servers至 Client.guilds在 newer ve
为什么当第二个值通过另一个 go routine 发送并且没有收到发送的第一个值时, channel c 没有缓冲? package main import "fmt" func sum(s []in
据我所知,内置的 split 会将一个 3 channel Mat 拆分为三个 1 channel Mat。结果,这三个 Mat 只是具有一些不同强度的灰度。 我的意图是获得三个 3 channel
如何检测当前的 RAM 配置?我需要询问 Windows RAM 当前是在单 channel 、双 channel 还是四 channel 中运行。 我搜索了很多,并没有在这个网站或其他网站上找到任何
我需要拆分一个多 channel wav 文件并将每个 channel 编码为 mp3 文件。 我知道 gtresamer 的 deinterleave 插件,但我不确定如何将它用于 wav 文件以及
关闭。这个问题需要details or clarity .它目前不接受答案。 想要改进这个问题吗? 通过 editing this post 添加详细信息并澄清问题. 关闭 8 年前。 Improve
我正在尝试运行 Hyperledger Fabric 网络,它由单个订购者、单个对等节点和一个 cli 组成。为了学习启动 Hyperledger Fabric 网络的过程,从创建与加密相关的工件到将
我在 Laravel 中使用事件广播。我正在使用基于角色的通知访问权限。我有用于广播的自定义 auth guard。当用户连接到 channel 时,客户端将具有内部权限的 access_token
我正在编写一个使用 Elixir Channels 来处理实时事件的应用程序。我知道每个客户端将打开 1 个套接字,并且可以在其上多路复用多个 channel 。所以我的应用程序是一个聊天应用程序,其
我有一些 .wav 文件,我想转换它们的频率 (fs) 和 channel 数 (nchannels)。我在jupyter笔记本python3.6上使用ffmpeg。我使用了以下命令并且它有效。 cm
我有一个视频渲染器,它需要两个 H265 流(YUV420),我需要烘焙它们以使它们中的一个与另一个形成 alpha 蒙版。这一切都已解决并且效果很好,但是如果我按照此处的说明进行操作: ffmpeg
我运行此命令以便能够将 udp 直播流传输到可使用正在构建的移动应用程序播放的 http 直播流。 它只是一个只有音频流的流。 ffmpeg -i udp://@localhost:1111 -map
我在我的 discord.js 机器人中创建了 nuke 命令,它创建了具有相同名称、权限、主题等的 channel ,并删除了“原始” channel 。但是有一个问题,如何使 channel 与“
我是一名优秀的程序员,十分优秀!