gpt4 book ai didi

go - 使用 channel 同步多个 goroutine

转载 作者:IT王子 更新时间:2023-10-29 00:57:38 26 4
gpt4 key购买 nike

我需要启动一些具有单一任务队列和单一结果队列的工作人员。每个 worker 应该在不同的 goroutine 中启动。而且我需要等到所有工作人员都完成并且任务队列为空才能退出程序。我已经为 goroutine 同步准备了一个小例子。主要思想是我们计算队列中的任务并等待所有工作人员完成工作。但目前的实现有时会错过值(value)。为什么会发生这种情况以及如何解决问题?示例代码:

import (
"fmt"
"os"
"os/signal"
"strconv"
)

const num_workers = 5

type workerChannel chan uint64

// Make channel for tasks
var workCh workerChannel
// Make channel for task counter
var cntChannel chan int

// Task counter
var tskCnt int64

// Worker function
func InitWorker(input workerChannel, result chan string, num int) {
for {
select {
case inp := <-input:
getTask()
result <- ("Worker " + strconv.Itoa(num) + ":" + strconv.FormatUint(inp, 10))
}
}
}

// Function to manage task counter
// should be in uniq goroutine
func taskCounter(inp chan int) {
for {
val := <-inp
tskCnt += int64(val)
}
}

// Put pask to the queue
func putTask(val uint64) {
func() {
fmt.Println("Put ", val)
cntChannel <- int(1)
workCh <- val
}()
}

// Get task from queue
func getTask() {
func() {
cntChannel <- int(-1)
}()
}

func main() {
// Init service channels
abort := make(chan os.Signal)
done := make(chan bool)

// init queue for results
result := make(chan string)

// init task queue
workCh = make(workerChannel)

// start some workers
for i := uint(0); i < num_workers; i++ {
go InitWorker(workCh, result, int(i))
}

// init counter for synchro
cntChannel = make(chan int)
go taskCounter(cntChannel)

// goroutine that put some tasks into queue
go func() {
for i := uint(0); i < 21; i++ {
putTask(uint64(i))
}

// wait for processing all tasks and close application
for len(cntChannel) != 0 {}
for tskCnt != 0 {}
for len(workCh) != 0 {}
for len(result) != 0 {}

// send signal for close
done <- true
}()

signal.Notify(abort, os.Interrupt)
for {
select {
case <-abort:
fmt.Println("Aborted.")
os.Exit(0)

// print results
case res := <-result:
fmt.Println(res)

case <-done:
fmt.Println("Done")
os.Exit(0)
}
}
}

最佳答案

使用sync.WaitGroup等待 goroutines 完成。关闭 channel 以导致 channel 上的循环读取退出。

package main

import (
"fmt"
"sync"
)

type workerChannel chan uint64

const num_workers = 5

func main() {

results := make(chan string)
workCh := make(workerChannel)

// Start workers
var wg sync.WaitGroup
wg.Add(num_workers)
for i := 0; i < num_workers; i++ {
go func(num int) {
defer wg.Done()
// Loop processing work until workCh is closed
for w := range workCh {
results <- fmt.Sprintf("worker %d, task %d", num, w)
}

}(i)
}

// Close result channel when workers are done
go func() {
wg.Wait()
close(results)
}()

// Send work to be done
go func() {
for i := 0; i < 21; i++ {
workCh <- uint64(i)
}
// Closing the channel causes workers to break out of loop
close(workCh)
}()

// Process results. Loop exits when result channel is closed.
for r := range results {
fmt.Println(r)
}
}

https://play.golang.org/p/ZifpzsP6fNv

关于go - 使用 channel 同步多个 goroutine,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50313376/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com