gpt4 book ai didi

go - 是否可以取消未完成的 goroutines?

转载 作者:行者123 更新时间:2023-12-05 01:52:00 26 4
gpt4 key购买 nike

考虑一组检查工作,每个检查工作都有独立的逻辑,因此它们似乎可以并发运行,例如:

type Work struct {
// ...
}

// This Check could be quite time-consuming
func (w *Work) Check() bool {
// return succeed or not

//...
}

func CheckAll(works []*Work) {
num := len(works)
results := make(chan bool, num)
for _, w := range works {
go func(w *Work) {
results <- w.Check()
}(w)
}

for i := 0; i < num; i++ {
if r := <-results; !r {
ReportFailed()
break;
}
}
}

func ReportFailed() {
// ...
}

当关注结果时,如果逻辑是无论哪个工作失败,我们都断言所有工作完全失败, channel 中剩余的值是无用的。让剩余未完成的 goroutine 继续运行并将结果发送到 channel 是没有意义和浪费的,尤其是当 w.Check() 相当耗时时。理想效果类似于:

    for _, w := range works {
if !w.Check() {
ReportFailed()
break;
}
}

这只运行必要的检查工作然后中断,但在顺序非并发场景中。

那么,是否可以取消这些未完成的goroutines,或者发送到channel?

最佳答案

取消(阻塞)发送

您最初的问题是询问如何取消发送操作。 channel 上的发送基本上是“即时的”。如果 channel 的缓冲区已满并且没有准备好的接收器,则 channel 上的发送将阻塞。

可以通过使用select 语句和您关闭的cancel channel “取消”此发送,例如:

cancel := make(chan struct{})

select {
case ch <- value:
case <- cancel:
}

在另一个 goroutine 上使用 close(cancel) 关闭 cancel channel 将使上面的选择放弃 ch 上的发送(如果它阻塞).

但如前所述,发送在“就绪” channel 上是“即时的”,发送首先评估要发送的值:

results <- w.Check()

这首先必须运行 w.Check(),一旦完成,它的返回值将在 results 上发送。

取消函数调用

所以你真正需要的是取消w.Check()方法调用。为此,惯用的方法是传递一个可以取消的 context.Context 值,并且 w.Check() 本身必须监视并“服从”这个取消请求。

参见 Terminating function execution if a context is cancelled

请注意,您的函数必须明确支持这一点。函数调用或 goroutines 没有隐式终止,参见 cancel a blocking operation in Go .

所以你的 Check() 应该看起来像这样:

// This Check could be quite time-consuming
func (w *Work) Check(ctx context.Context, workDuration time.Duration) bool {
// Do your thing and monitor the context!

select {
case <-ctx.Done():
return false
case <-time.After(workDuration): // Simulate work
return true
case <-time.After(2500 * time.Millisecond): // Simulate failure after 2.5 sec
return false
}
}

CheckAll() 可能如下所示:

func CheckAll(works []*Work) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

num := len(works)
results := make(chan bool, num)

wg := &sync.WaitGroup{}
for i, w := range works {
workDuration := time.Second * time.Duration(i)
wg.Add(1)
go func(w *Work) {
defer wg.Done()
result := w.Check(ctx, workDuration)
// You may check and return if context is cancelled
// so result is surely not sent, I omitted it here.
select {
case results <- result:
case <-ctx.Done():
return
}
}(w)
}

go func() {
wg.Wait()
close(results) // This allows the for range over results to terminate
}()

for result := range results {
fmt.Println("Result:", result)
if !result {
cancel()
break
}
}
}

测试它:

CheckAll(make([]*Work, 10))

输出(在 Go Playground 上尝试):

Result: true
Result: true
Result: true
Result: false

我们打印了 3 次 true(在 2.5 秒内完成),然后故障模拟开始,返回 false,并终止所有其他作业。

请注意,上面示例中的 sync.WaitGroup 并不是严格需要的,因为 results 有一个能够保存所有结果的缓冲区,但总的来说它仍然是一个很好的做法(你将来应该使用更小的缓冲区吗?

参见相关:Close multiple goroutine if an error occurs in one in go

关于go - 是否可以取消未完成的 goroutines?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71855079/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com