gpt4 book ai didi

go - 关闭和发送到 channel 之间的竞争条件

转载 作者:行者123 更新时间:2023-12-01 22:26:43 25 4
gpt4 key购买 nike

我正在尝试使用工作池构建通用管道库。我为源、管道和接收器创建了一个接口(interface)。您会看到,管道的工作是从输入 channel 接收数据,对其进行处理,然后将结果输出到 channel 上。这是它的预期行为:

  • 从输入 channel 接收数据。
  • 将数据委托(delegate)给可用的工作人员。
  • 工作人员将结果发送到输出 channel 。
  • 所有工作人员完成后关闭输出 channel 。

  • func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
    var wg sync.WaitGroup
    out = make(chan interface{}, 100)
    go func() {
    for i := 1; i <= 100; i++ {
    go p.work(in, out, &wg)
    }
    wg.Wait()
    close(out)
    }()

    return
    }

    func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {
    for j := range jobs {
    func(j Job) {
    defer wg.Done()
    wg.Add(1)

    res := doSomethingWith(j)

    out <- res
    }(j)
    }
    }


    但是,运行它可能会在不处理所有输入的情况下退出,或者会出现 send on closed channel 的 panic 。信息。使用 -race 构建源代码flag 在 close(out) 之间发出数据竞争警告和 out <- res .

    以下是我认为可能发生的情况。一旦许多 worker 完成了他们的工作,就会有一瞬间 wg的计数器达到零。因此, wg.Wait()完成并且程序继续到 close(out) .同时,job channel 还没有完成数据生成,这意味着一些工作人员仍在另一个 goroutine 中运行。由于 out channel 已经关闭,它会导致 panic 。

    WaitGroup 是否应该放在其他地方?还是有更好的方法来等待所有 worker 完成?

    最佳答案

    目前尚不清楚为什么每个工作需要一名 worker ,但如果你这样做了,你可以重组你的外循环设置(见下面未经测试的代码)。这种方式首先消除了对工作池的需求。

    但是,请始终执行 wg.Add在解雇任何 worker 之前。在这里,你正在剥离 100 个 worker :

    var wg sync.WaitGroup
    out = make(chan interface{}, 100)
    go func() {
    for i := 1; i <= 100; i++ {
    go p.work(in, out, &wg)
    }
    wg.Wait()
    close(out)
    }()

    因此,您可以这样做:
    var wg sync.WaitGroup
    out = make(chan interface{}, 100)
    go func() {
    wg.Add(100) // ADDED - count the 100 workers
    for i := 1; i <= 100; i++ {
    go p.work(in, out, &wg)
    }
    wg.Wait()
    close(out)
    }()

    请注意,您现在可以移动 wg本身进入剥离 worker 的 goroutine。如果你放弃让每个工作人员将工作作为新的 goroutine 分拆的想法,这可以让事情变得更干净。但是,如果每个 worker 都将衍生出另一个 goroutine,那么该 worker 本身也必须使用 wg.Add , 像这样:
    for j := range jobs {
    wg.Add(1) // ADDED - count the spun-off goroutines
    func(j Job) {
    res := doSomethingWith(j)

    out <- res
    wg.Done() // MOVED (for illustration only, can defer as before)
    }(j)
    }
    wg.Done() // ADDED - our work in `p.work` is now done

    也就是说,每个匿名函数都是 channel 的另一个用户,因此在启动新的 goroutine 之前增加用户 channel 计数( wg.Add(1))。当你读完输入 channel jobs , 调用 wg.Done() (也许是通过较早的 defer ,但我在这里最后展示了它)。

    思考这个问题的关键是 wg计算此时可以写入 channel 的事件 goroutine 的数量。只有当没有 goroutine 打算再写时,它才会变为零。 这样可以安全地关闭 channel 。

    考虑使用更简单的(但未经测试):
    func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
    out = make(chan interface{})
    var wg sync.WaitGroup
    go func() {
    defer close(out)
    for j := range in {
    wg.Add(1)
    go func(j Job) {
    res := doSomethingWith(j)
    out <- res
    wg.Done()
    }(j)
    }
    wg.Wait()
    }()
    return out
    }

    你现在有一个 goroutine 正在读取 in尽可能快地进行 channel ,在运行时分拆工作。每个传入的工作都会得到一个 goroutine,除非他们提前完成工作。没有池,每个作业只有一个 worker (与您的代码相同,只是我们淘汰了没有做任何有用的池)。

    或者,因为只有一些可用的 CPU,所以在开始时像以前一样分拆一些 goroutine,但让每个 goroutine 运行一个作业以完成,并交付其结果,然后返回阅读下一个作业:
    func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
    out = make(chan interface{})
    go func() {
    defer close(out)
    var wg sync.WaitGroup
    ncpu := runtime.NumCPU() // or something fancier if you like
    wg.Add(ncpu)
    for i := 0; i < ncpu; i++ {
    go func() {
    defer wg.Done()
    for j := range in {
    out <- doSomethingWith(j)
    }
    }()
    }
    wg.Wait()
    }
    return out
    }

    通过使用 runtime.NumCPU()读取作业的 worker 数量与运行作业的 CPU 数量一样多。那些是游泳池,他们一次只做一项工作。

    如果输出 channel 读取器结构良好(即不会导致管道阻塞),通常不需要缓冲输出 channel 。如果不是,则此处的缓冲深度会限制您可以“提前”完成多少工作,而无论谁正在使用结果。根据“提前工作”执行此操作的有用程度来设置它 - 不一定是 CPU 的数量,或预期作业的数量,或其他任何东西。

    关于go - 关闭和发送到 channel 之间的竞争条件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59527245/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com