gpt4 book ai didi

go - 如何处理可以无阻塞增长的队列

转载 作者:数据小太阳 更新时间:2023-10-29 03:10:57 25 4
gpt4 key购买 nike

如果队列可以从处理函数本身增长,我正在尝试了解如何在 Go 中处理队列。请参见下面的代码。

在这个伪代码中,我想将我创建的处理程序数量限制为 10。因此我创建了 10 个处理队列的处理程序。然后我用一个 url 开始排队。

我的问题是,根据文档, channel 的 sender 将阻塞,直到接收者接收到数据。在下面的代码中,每个进程都是一个处理新 url 的接收器。然而,很容易看出,如果一个进程向队列发送 11 个链接,它将阻塞,直到所有接收者都处理完这些新链接。如果这些接收者每个都有 1 个链接,那么它们也会在将新的 1 个链接发送到队列时阻塞。由于每个人都被阻止,所以什么都没有完成。

我想知道 go 的一般解决方案是什么,用于处理可以从进程本身增长的队列。请注意,我认为我可以通过锁定名为 queue 的数组来执行此操作,但我正在尝试了解如何使用 channel 完成此操作。

var queue = make(chan string)

func process(){
for currentURL := range queue {
links, _ := ... // some http call that gets links from a url
for _, link := links {
queue <- link
}
}
}

func main () {
for i :=0; i < 10; i++ {
go process()
}

queue <- "https://stackoverflow.com"
...
// block until receive some quit message
<-quit
}

最佳答案

您可以使用的一种简单方法是将用于将链接添加到 channel 的代码移动到它自己的 go 例程中。这样,您的主要处理可以继续,而阻塞的 channel 写入将阻塞一个单独的 go 例程。

func process(){
for currentURL := range queue {
links, _ := ... // some http call that gets links from a url
for _, link := links {
l := link // this is important! ...
// the loop will re-set the value of link before the go routine is started

go func(l) {
queue <- link // we'll be blocked here...
// but the "parent" routine can still iterate through the channel
// which in turn un-blocks the write
}(l)
}
}
}

使用信号量示例编辑以限制 go 例程:

func main () {
maxWorkers := 5000
sem := semaphore.NewWeighted(int64(maxWorkers))
ctx := context.TODO()
for i :=0; i < 10; i++ {
go process(ctx)
}

queue <- "https://stackoverflow.com"
// block until receive some quit message
<-quit
}

func process(ctx context.Context){
for currentURL := range queue {
links, _ := ... // some http call that gets links from a url
for _, link := links {
l := link // this is important! ...
// the loop will re-set the value of link before the go routine is started

// acquire a go routine...
// if we are at the routine limit, this line will block until one becomes available
sem.Acquire(ctx, 1)
go func(l) {
defer sem.Release(1)
queue <- link // we'll be blocked here...
// but the "parent" routine can still iterate through the channel
// which in turn un-blocks the write
}(l)
}
}
}

尽管此选项最终可能导致死锁...假设所有 go 例程都已声明,父循环可能会锁定在 sem.Acquire 上。这将导致子例程永远不会添加到 channel ,因此永远不会执行延迟的 sem.Release。在我的脑海中,我正在努力想出一个很好的方法来处理这个问题。也许是外部内存队列而不是 channel ?

关于go - 如何处理可以无阻塞增长的队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51911766/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com