gpt4 book ai didi

go - 如何与 channel 中的未决结果同步?

转载 作者:数据小太阳 更新时间:2023-10-29 03:17:30 26 4
gpt4 key购买 nike

我有一个工作池,它提供了一个同步接口(interface)来提取结果:

func (p *Pool) Get() *Result {
for {
select {
// if there are results in channel return them
case r := <-p.results:
return r
// else check if there is any work pending we must wait for
// if not return nil to indicate that all work was done
default:
if p.active < 1 {
return nil
}
}
}
}

想法是 Get 将返回下一个工作结果或 nil 如果所有工作都已完成。

现在这个实现的问题是我需要用 p.active 计数器手动跟踪所有事件的工作。这感觉有点不对劲,因为理论上信息已经位于 p.results channel 的长度中。

什么是在缓冲区为空时不返回任何内容的惯用方法?

最佳答案

遗憾的是没有 len(chan),如果您不知道 worker 的数量,您的方法就已经很好了。

但是您需要某种类型的计数器同步,这里有一个非常简单的方法:

type Result struct {
I int
}
type Pool struct {
res chan *Result
c int32
}

func New() *Pool {
return &Pool{
res: make(chan *Result),
}
}

func (p *Pool) Put(r *Result) {
atomic.AddInt32(&p.c, 1)
time.Sleep(time.Duration(100+r.I%1000) * time.Microsecond)
p.res <- r
}

func (p *Pool) Get() (r *Result) {
for {
select {
case r = <-p.res:
atomic.AddInt32(&p.c, -1)
return
default:
if atomic.LoadInt32(&p.c) == 0 {
return
}
}
}
}
func main() {
runtime.GOMAXPROCS(8)
p := New()
for i := 0; i < 50; i++ {
go p.Put(&Result{i})
}
time.Sleep(10 * time.Microsecond)
for {
r := p.Get()
if r == nil {
return
}
fmt.Println("r.I", r.I)
}
}

//编辑

为了完整起见,这是另一个使用 WaitGroup 的示例,但这又是一个矫枉过正,因为内部工作组无论如何都使用原子计数器。

type Pool struct {
res chan *Result
wg sync.WaitGroup
}

func New(n int) (p *Pool) {
p = &Pool{
res: make(chan *Result, n),
}
p.wg.Add(n)
go func() {
p.wg.Wait()
close(p.res)
}()
return
}

func (p *Pool) Get() *Result {
for {
r, ok := <-p.res
if !ok {
return nil
}
p.wg.Done()
return r

}
}

//func Put is the same as above and the test code is the same.

关于go - 如何与 channel 中的未决结果同步?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24723327/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com