gpt4 book ai didi

go - 实现具有多个生产者的信号量(使用 goroutines)

转载 作者:数据小太阳 更新时间:2023-10-29 03:25:51 25 4
gpt4 key购买 nike

这一直是我存在的祸根。

type ec2Params struct {
sess *session.Session
region string
}

type cloudwatchParams struct {
cl cloudwatch.CloudWatch
id string
metric string
region string
}

type request struct {
ec2Params
cloudwatchParams
}

// Control concurrency and sync
var maxRoutines = 128
var sem chan bool
var req chan request

func main() {
sem := make(chan bool, maxRoutines)
for i := 0; i < maxRoutines; i++ {
sem <- true
}
req := make(chan request)
go func() { // This is my the producer
for _, arn := range arns {
arnCreds := startSession(arn)
for _, region := range regions {
sess, err := session.NewSession(
&aws.Config{****})
if err != nil {
failOnError(err, "Can't assume role")
}
req <- request{ec2Params: ec2Params{ **** }}
}
}
}()
for f := range(req) {
<- sem
if (ec2Params{}) != f.ec2Params {
go getEC2Metrics(****)
} else {
// I should be excercising this line of code too,
// but I'm not :(
go getMetricFromCloudwatch(****)
}
sem <- true
}
}

getEC2MetricsgetCloudwatchMetrics 是要执行的 goroutines

func getMetricFromCloudwatch(cl cloudwatch.CloudWatch, id, metric, region string) {
// Magic
}

func getEC2Metrics(sess *session.Session, region string) {
ec := ec2.New(sess)
var ids []string
l, err := ec.DescribeInstances(&ec2.DescribeInstancesInput{})
if err != nil {
fmt.Println(err.Error())
} else {
for _, rsv := range l.Reservations {
for _, inst := range rsv.Instances {
ids = append(ids, *inst.InstanceId)
}
}
metrics := cfg.AWSMetric.Metric
if len(ids) >= 0 {
cl := cloudwatch.New(sess)
for _, id := range ids{
for _, metric := range metrics {
// For what I can tell, execution get stuck here
req <- request{ cloudwatchParams: ***** }}
}
}
}
}
}

maingetEC2Metrics 中的匿名生产者都应该异步地向 req 发布数据,但到目前为止,它似乎是 getEC2Metrics 正在发布到 channel ,永远不会被处理。似乎有什么东西阻止我在 goroutine 中发布,但我什么也没找到。我很想知道如何解决这个问题并产生预期的行为(这是一个实际工作的信号量)。

实现的基础可以在这里找到:https://burke.libbey.me/conserving-file-descriptors-in-go/

最佳答案

我很着急,JimB 的评论使轮子旋转,现在我已经解决了这个问题!

// Control concurrency and sync
var maxRoutines = 128
var sem chan bool
var req chan request // Not reachable inside getEC2Metrics

func getEC2Metrics(sess *session.Session, region string, req chan <- request ) {
....
....
for _, id := range ids{
for _, metric := range metrics {
req <- request{ **** }} // When using the global req,
// this would block
}
}
....
....
}

func main() {
sem := make(chan bool, maxRoutines)
for i := 0; i < maxRoutines; i++ {
sem <- true
}
req := make(chan request)
go func() {
// Producing tasks
}()
for f := range(req) {
<- sem // checking out tickets outside the goroutine does block
//outside of the goroutine
go func() {
defer func() { sem <- true }()
if (ec2Params{}) != f.ec2Params {
getEC2Metrics(****, req) // somehow sending the channel makes
// possible to publish to it
} else {
getMetricFromCloudwatch(****)
}
}()
}
}

有两个问题:

  1. 信号量没有锁定(我认为这是因为我在 goroutine 中 check out 和放入 token ,所以可能存在竞争条件)。
  2. 出于某种原因,getEC2Metrics 未正确处理全局 channel 请求,因此在尝试发布到显然在范围内的 channel 时,所有 goroutine 都会卡住,但它不是't(我真的不知道为什么)。

老实说,我对第二个项目很幸运,到目前为止我还没有找到任何关于这个怪癖的文档,但最后我很高兴它能正常工作。

关于go - 实现具有多个生产者的信号量(使用 goroutines),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42147854/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com