gpt4 book ai didi

go - 多个 goroutines 在一个 channel 上有选择地监听

转载 作者:行者123 更新时间:2023-12-05 09:04:40 27 4
gpt4 key购买 nike

我看过this , this , thisthis但在这种情况下没有人真正帮助我。如果 channel 中的值是针对特定 goroutine,我有多个 goroutine 需要执行某些任务。

var uuidChan chan string

func handleEntity(entityUuid string) {
go func() {
for {
select {
case uuid := <-uuidChan:
if uuid == entityUuid {
// logic
println(uuid)
return
}
case <-time.After(time.Second * 5):
println("Timeout")
return
}
}
}()
}

func main() {
uuidChan = make(chan (string))
for i := 0; i < 5; i++ {
handleEntity(fmt.Sprintf("%d", i))
}
for i := 0; i < 4; i++ {
uuidChan <- fmt.Sprintf("%d", i)
}
}

https://play.golang.org/p/Pu5MhSP9Qtj

在上述逻辑中,uuid 被其中一个 channel 接收,但没有任何反应。为了解决这个问题,如果某些 uuid 的逻辑不在该例程中,我尝试更改逻辑以将 uuid 重新插入回 channel 。我知道这是一种不好的做法,而且也行不通。

func handleEntity(entityUuid string) {
go func() {
var notMe []string // stores list of uuids that can't be handled by this routine and re-inserts it in channel.
for {
select {
case uuid := <-uuidChan:
if uuid == entityUuid {
// logic
println(uuid)
return
} else {
notMe = append(notMe, uuid)
}
case <-time.After(time.Second * 5):
println("Timeout")
defer func() {
for _, uuid := range notMe {
uuidChan <- uuid
}
}()
return
}
}
}()
}

https://play.golang.org/p/5On-Vd7UzqP

执行此操作的正确方法是什么?

最佳答案

您有一个盒子,里面有一个标签,因此接收者应该先阅读标签,然后再决定如何处理它。如果您将标签放在盒子内 - 您是在强制收货人打开盒子(请参阅解决方案编号 1)。我鼓励您提供更好的邮政服务,至少将标签放在箱子外面(参见解决方案编号 3)- 或者最好立即将箱子送到正确的地址(参见解决方案编号 2):

对此有很多解决方案,您只受限于您的想象力:
1.由于对于具有 ID 的消费者,您只有一个 channel ,其中包含一个带有 ID 的数据,并且您只能从该 channel 读取一次数据(假设 channel 内数据的顺序很重要) - 您有一个简单的解决方案:使用读取 goroutine 从 channel 读取数据,然后应用逻辑来决定如何处理这些数据 - 例如将它发送到另一个 goroutine 或运行一个任务。
尝试 this :

package main

import (
"fmt"
"sync"
"time"
)

func main() {
uuidChan := make(chan string)
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
t := time.NewTimer(5 * time.Second)
defer t.Stop()
for {
select {
case uuid, ok := <-uuidChan:
if !ok {
fmt.Println("Channel closed.")
return
}
// logic:
wg.Add(1)
// Multiple goroutines listening selectively on one channel
go consume(uuid, &wg)
// switch uuid {case 1: go func1(); case 2: go func2()}

case <-t.C:
fmt.Println("Timeout")
return
}
}
}()

for i := 0; i < 4; i++ {
uuidChan <- fmt.Sprintf("%d", i)
}
close(uuidChan) // free up the goroutine

wg.Wait() // wait until all consumers are done
fmt.Println("All done.")
}

// Multiple goroutines listening selectively on one channel
func consume(uuid string, wg *sync.WaitGroup) {
defer wg.Done()
// logic: or decide here based on uuid
fmt.Println("job #:", uuid) // job
}

输出:

job #: 0
job #: 2
job #: 1
Channel closed.
job #: 3
All done.

  1. 为每个 goroutine 使用一个 channel ,尝试 this :
package main

import (
"fmt"
"sync"
"time"
)

func handleEntity(uuidChan chan string, wg *sync.WaitGroup) {
defer wg.Done()
// for {
select {
case uuid, ok := <-uuidChan:
if !ok {
fmt.Println("closed")
return // free up goroutine on chan closed
}
fmt.Println(uuid)
return // job done

case <-time.After(1 * time.Second):
fmt.Println("Timeout")
return
}
// }
}

func main() {
const max = 5
slice := make([]chan string, max)
var wg sync.WaitGroup

for i := 0; i < max; i++ {
slice[i] = make(chan string, 1)

wg.Add(1)
go handleEntity(slice[i], &wg)
}

for i := 0; i < 4; i++ {
slice[i] <- fmt.Sprintf("%d", i) // send to the numbered channel
}

wg.Wait()
fmt.Println("All done.")
}

输出:

3
0
1
2
Timeout
All done.

  1. 使用labelsync.Cond的信号广播:
    所以我们有一个 box 并使用名为 label 的共享变量,我们在框的顶部添加了接收者的地址。这里使用名为 label 的共享资源,首先将框 label 设置为所需的 ID,然后使用信号广播通知所有监听的 goroutines 唤醒并检查 label 和时间来查看一个是否被寻址和过期然后所有返回到等待状态并且被寻址或过期的继续读取无缓冲 channel 或退出。然后使用 time.AfterFunc 发出剩余 goroutine 到期的信号,最后使用 wg.Wait() 让它们全部加入。请注意,第一个 c.Broadcast() 应该在 c.Wait() 之后调用 - 这意味着 goroutine 应该在第一次调用 c.Broadcast 之前运行(),所以一种方法是简单地使用另一个名为 w4wsync.WaitGroup 的缩写 wait for wait
package main

import (
"fmt"
"sync"
"time"
)

func handleEntity(entityUuid string) {
defer wg.Done()
t0 := time.Now()
var expired, addressed bool

w4w.Done()
m.Lock()
for !expired && !addressed {
c.Wait()
addressed = label == entityUuid
expired = time.Since(t0) > d
}
m.Unlock()

fmt.Println("id =", entityUuid, "addressed =", addressed, "expired =", expired)
if !expired && addressed {
uuid := <-uuidChan
fmt.Println("matched =", entityUuid, uuid)
}
fmt.Println("done", entityUuid)
}

func main() {
for i := 0; i < 5; i++ {
w4w.Add(1)
wg.Add(1)
go handleEntity(fmt.Sprintf("%d", i))
}
w4w.Wait()

time.AfterFunc(d, func() {
// m.Lock()
// label = "none"
// m.Unlock()
fmt.Println("expired")
c.Broadcast() // expired
})

for i := 0; i < 4; i++ {
m.Lock()
label = fmt.Sprintf("%d", i)
m.Unlock()
c.Broadcast() // notify all
uuidChan <- label
}

fmt.Println("...")
wg.Wait()
fmt.Println("all done")
}

var (
label string
uuidChan = make(chan string)
m sync.Mutex
c = sync.NewCond(&m)
w4w, wg sync.WaitGroup
d = 1 * time.Second
)

输出:

id = 0 addressed = true expired = false
matched = 0 0
done 0
id = 1 addressed = true expired = false
matched = 1 1
done 1
id = 2 addressed = true expired = false
matched = 2 2
done 2
id = 3 addressed = true expired = false
matched = 3 3
done 3
...
expired
id = 4 addressed = false expired = true
done 4
all done

关于go - 多个 goroutines 在一个 channel 上有选择地监听,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68388460/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com