gpt4 book ai didi

multithreading - 如何在 Go 中安全地关闭一个 chan chan T?

转载 作者:IT王子 更新时间:2023-10-29 02:10:53 26 4
gpt4 key购买 nike

我正在实现一个简单的工作池算法,其中 1 个 Sender(调度程序)将作业发送到 M(Worker)go 例程。为此,它使用一个 channel 的 channel 来为第一个空闲的工作人员分配一个可用的工作:

// builds the pool
func NewWorkerPool(maxWorkers int) WorkerPool {
pool := make(chan chan Job, maxWorkers)
workers := make([]Worker, 0)
return WorkerPool{
WorkerPool: pool,
Workers: workers,
maxWorkers: maxWorkers,
waitGroup: sync.WaitGroup{}}
}

// Starts the WorkerPool
func (p *WorkerPool) Run(queue chan Job) {
w := p.waitGroup

// starting n number of workers
for i := 0; i < p.maxWorkers; i++ {
worker := NewWorker(p.WorkerPool)
p.Workers = append(p.Workers, worker)
w.Add(1)
worker.Start(&w)
}

go p.dispatch(queue)
}

// dispatches a job to be handled by an idle Worker of the pool
func (p *WorkerPool) dispatch(jobQueue chan Job) {
for {
select {
case job := <-jobQueue:
// a model request has been received
go func(job Job) {
// try to obtain a worker model channel that is available.
// this will block until a worker is idle
jobChannel := <-p.WorkerPool

// dispatch the model to the worker model channel
jobChannel <- job
}(job)
}
}
}


// checks if a Worker Pool is open or closed - If we can recieve on the channel then it is NOT closed
func (p *WorkerPool) IsOpen() bool {
_, ok := <-p.WorkerPool
return ok
}

worker 的启动和停止方法

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start(wg *sync.WaitGroup) {
go func() {
defer wg.Done()
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel

select {
case job := <-w.JobChannel:
// we have received a work request.
result := job.Run()
job.ReturnChannel <- result

// once result is returned close the job output channel
close(job.ReturnChannel)

case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}

现在我尝试使用以下方法关闭池,我使用 sync.WaitGroup 以等待所有工作人员关闭:

// stops the Pool
func (p *WorkerPool) Stop() bool {
// stops all workers
for _, worker := range p.Workers {
worker.Stop()
}
p.waitGroup.Wait() //Wait for the goroutines to shutdown

close(p.WorkerPool)

more := p.IsOpen()

fmt.Printf(" more? %t", more)

return ok
}

//打印更多?真

即使我等待工作人员退出并稍后调用 close(p.WorkerPool) 我仍然打开 channel ,在这种情况下缺少什么,如何相应地关闭 channel ?

最佳答案

关闭 channel 表示不再向其发送任何值。这对于将完成情况传达给 channel 的接收者很有用。

channel 中的数据仍然存在,您可能必须关闭 channel 然后删除其中的所有 channel ,如下所示

// Stop stops the Pool and free all the channels
func (p *WorkerPool) Stop() bool {
// stops all workers
for _, worker := range p.Workers {
worker.Stop()
}
p.waitGroup.Wait() //Wait for the goroutines to shutdown
close(p.WorkerPool)
for channel := range p.WorkerPool {
fmt.Println("Freeing channel") //remove all the channels
}
more := p.IsOpen()
fmt.Printf(" more? %t", more)

return ok
}

顺便说一句,一个不能用_, ok <-检查 channel 是否关闭。我会建议函数的不同名称

关于multithreading - 如何在 Go 中安全地关闭一个 chan chan T?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47529636/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com