gpt4 book ai didi

Go worker 池同时限制 goroutine 的数量和计算超时

转载 作者:行者123 更新时间:2023-12-05 06:49:30 25 4
gpt4 key购买 nike

我有一个函数应该最多创建 N 个 goroutine,然后每个 goroutine 将从作业 channel 读取并进行一些计算。但是需要注意的是,如果计算花费的时间超过 X,请结束该计算并继续进行下一个计算。

func doStuff(){
rules := []string{
"a",
"b",
"c",
"d",
"e",
"f",
"g",
}
var (
jobs = make(chan []string, len(rules))
res = make(chan bool, len(rules))
matches []string
)

w := func(jobs <-chan []string, results chan<- bool) {
for j := range jobs {
k, id := j[0], j[1]
if id == "c" || id == "e" {
time.Sleep(time.Second * 5)
}
m := match(k, id)
res <- m
}
}
N := 2
for i := 0; i < N; i++ {
go w(jobs, res)
}

for _, rl := range rules {
jobs <- []string{"a", rl}
}
close(jobs)

for i := 0; i < len(rules); i++ {
select {
case match := <-res:
matches = append(matches, match)
case <-time.After(time.Second):
}
}
fmt.Println(matches)
}

预期的结果是:

[a, b, d, f, g]

但我得到的是:

[a, b, d]

似乎在其中一个 goroutine 由于 sleep 而完全完成之前从结果 channel 读取结束。所以我添加了一个带有截止日期的上下文,但现在它无限期地挂起:

    w := func(jobs <-chan []string, results chan<- string) {
for j := range jobs {
ctx, c := context.WithDeadline(context.Background(), time.Now().Add(time.Second*2))
defer c()
k, id := j[0], j[1]
if id == "c" || id == "e" {
time.Sleep(time.Second * 5)
}
m := match(k, id)
select {
case res <- m:
case <-ctx.Done():
fmt.Println("Canceled by timeout")
continue
}
}
}

我读过其他关于在超时时完全杀死 goroutine 的问题,但在超时时找不到关于跳过的任何内容。

最佳答案

我为这样的用例制作了一个包。请查看此存储库:github.com/MicahParks/ctxerrgroup .

这是一个完整的示例,说明您的代码在使用包和流式传输结果时的外观。流式方法的内存效率更高。原始方法在最后打印之前将所有结果保存在内存中。

package main

import (
"context"
"log"
"time"

"github.com/MicahParks/ctxerrgroup"
)

func main() {

// The number of worker goroutines to use.
workers := uint(2)

// Create an error handler that logs all errors.
//
// The original work item didn't return an error, so this is not required.
var errorHandler ctxerrgroup.ErrorHandler
errorHandler = func(_ ctxerrgroup.Group, err error) {
log.Printf("A job in the worker pool failed.\nError: %s", err.Error())
}

// Create the group of workers.
group := ctxerrgroup.New(workers, errorHandler)

// Create the question specific assets.
rules := []string{
"a",
"b",
"c",
"d",
"e",
"f",
"g",
}
results := make(chan bool)

// Create a parent timeout.
timeout := time.Second
parentTimeout, parentCancel := context.WithTimeout(context.Background(), timeout)
defer parentCancel()

// Iterate through all the rules to use.
for _, rule := range rules {

// Create a child context for this specific work item.
ctx, cancel := context.WithCancel(parentTimeout)

// Create and add the work item.
group.AddWorkItem(ctx, cancel, func(workCtx context.Context) (err error) {

// Deliberately shadow the rule so the next iteration doesn't take over.
rule := rule

// Do the work using the workCtx.
results <- match(workCtx, "a", rule)

return nil
})
}

// Launch a goroutine that will close the results channel when everyone is finished.
go func() {
group.Wait()
close(results)
}()

// Print the matches as the happen. This will not hang.
for result := range results {
log.Println(result)
}

// Wait for the group to finish.
//
// This is not required, but doesn't hurt as group.Wait is idempotent. It's here in case you remove the goroutine
// waiting and closing the channel above.
group.Wait()
}

// match is a function from the original question. It now accepts and properly uses the context argument.
func match(ctx context.Context, key, id string) bool {
panic("implement me")
}

关于Go worker 池同时限制 goroutine 的数量和计算超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66592386/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com