gpt4 book ai didi

go - 通过缓冲 channel (Golang) 限制并发执行进程的数量

转载 作者:IT王子 更新时间:2023-10-29 01:57:19 36 4
gpt4 key购买 nike

意图:

我正在寻找一种并行运行操作系统级 shell 命令的方法,但要注意不要破坏 CPU,并且想知道缓冲 channel 是否适合这种用例。

已实现:

创建一系列具有模拟运行时间的Job。将这些作业发送到一个队列,该队列将调度它们以通过缓冲 channel 运行,受EXEC_THROTTLE限制。

观察:

这“有效”(在编译和运行的范围内),但我想知道缓冲区是否按规定工作(参见:“意图”)以限制并行运行的进程数。

免责声明:

现在,我知道新手往往会过度使用 channel ,但我觉得这种洞察力请求是诚实的,因为我至少已经克制地使用了 sync.WaitGroup。原谅有点玩具的例子,但所有的见解将不胜感激。

Playground

package main

import (
// "os/exec"
"log"
"math/rand"
"strconv"
"sync"
"time"
)

const (
EXEC_THROTTLE = 2
)

type JobsManifest []Job

type Job struct {
cmd string
result string
runtime int // Simulate long-running task
}

func (j JobsManifest) queueJobs(logChan chan<- string, runChan chan Job, wg *sync.WaitGroup) {
go dispatch(logChan, runChan)
for _, job := range j {
wg.Add(1)
runChan <- job
}
}

func dispatch(logChan chan<- string, runChan chan Job) {
for j := range runChan {
go run(j, logChan)
}
}

func run(j Job, logChan chan<- string) {
time.Sleep(time.Second * time.Duration(j.runtime))
j.result = strconv.Itoa(rand.Intn(10)) // j.result = os.Exec("/bin/bash", "-c", j.cmd).Output()
logChan <- j.result
log.Printf(" ran: %s\n", j.cmd)
}

func logger(logChan <-chan string, wg *sync.WaitGroup) {
for {
res := <-logChan
log.Printf("logged: %s\n", res)
wg.Done()
}
}

func main() {

jobs := []Job{
Job{
cmd: "ps -p $(pgrep vim) | tail -n 1 | awk '{print $3}'",
runtime: 1,
},
Job{
cmd: "wc -l /var/log/foo.log | awk '{print $1}'",
runtime: 2,
},
Job{
cmd: "ls -l ~/go/src/github.com/ | wc -l | awk '{print $1}'",
runtime: 3,
},
Job{
cmd: "find /var/log/ -regextype posix-extended -regex '.*[0-9]{10}'",
runtime: 4,
},
}

var wg sync.WaitGroup
logChan := make(chan string)
runChan := make(chan Job, EXEC_THROTTLE)
go logger(logChan, &wg)

start := time.Now()
JobsManifest(jobs).queueJobs(logChan, runChan, &wg)
wg.Wait()
log.Printf("finish: %s\n", time.Since(start))
}

最佳答案

您还可以使用缓冲 channel 限制并发:

concurrencyLimit := 2 // Number of simultaneous jobs.
semaphore := make(chan struct{}, concurrencyLimit)
for job := range jobs {
job := job // Pin loop variable.
semaphore <- struct{}{} // Acquire semaphore slot.
go func() {
defer func() {
<-semaphore // Release semaphore slot.
}()

do(job) // Do the job.
}()
}
// Wait for goroutines to finish by acquiring all slots.
for i := 0; i < cap(semaphore); i++ {
semaphore <- struct{}{}
}

关于go - 通过缓冲 channel (Golang) 限制并发执行进程的数量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48053950/

36 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com