gpt4 book ai didi

multithreading - Goroutines, channels select 语句

转载 作者:数据小太阳 更新时间:2023-10-29 03:24:40 25 4
gpt4 key购买 nike

我在构建我的 goroutines 和 channel 时遇到问题。我的 select 语句在所有 goroutine 完成之前一直退出,我知道问题出在我发送完成信号的地方。我应该在哪里发送完成信号。

func startWorker(ok chan LeadRes, err chan LeadResErr, quit chan int, verbose bool, wg *sync.WaitGroup) {
var results ProcessResults
defer wg.Done()
log.Info("Starting . . .")
start := time.Now()

for {
select {
case lead := <-ok:
results.BackFill = append(results.BackFill, lead.Lead)
case err := <-err:
results.BadLeads = append(results.BadLeads, err)
case <-quit:
if verbose {
log.Info("Logging errors from unprocessed leads . . .")
logBl(results.BadLeads)
}
log.WithFields(log.Fields{
"time-elapsed": time.Since(start),
"number-of-unprocessed-leads": len(results.BadLeads),
"number-of-backfilled-leads": len(results.BackFill),
}).Info("Done")
return
}
}
}

//BackFillParallel . . .
func BackFillParallel(leads []Lead, verbose bool) {
var wg sync.WaitGroup
gl, bl, d := getChans()
for i, lead := range leads {
done := false
if len(leads)-1 == i {
done = true
}
wg.Add(1)
go func(lead Lead, done bool, wg *sync.WaitGroup) {
ProcessLead(lead, gl, bl, d, done, wg)
}(lead, done, &wg)

}
startWorker(gl, bl, d, verbose, &wg)
}

//ProcessLead . . .
func ProcessLead(lead Lead, c1 chan LeadRes, c2 chan LeadResErr, c3 chan int, done bool, wg *sync.WaitGroup) {
defer wg.Done()
var payloads []Payload
for _, p := range lead.Payload {
decMDStr, err := base64.StdEncoding.DecodeString(p.MetaData)
if err != nil {
c2 <- LeadResErr{lead, err.Error()}
}
var decMetadata Metadata
if err := json.Unmarshal(decMDStr, &decMetadata); err != nil {
goodMetadata, err := FixMDStr(string(decMDStr))
if err != nil {
c2 <- LeadResErr{lead, err.Error()}
}
p.MetaData = goodMetadata

payloads = append(payloads, p)
}
}

lead.Payload = payloads
c1 <- LeadRes{lead}
if done {
c3 <- 0
}
}

最佳答案

首先评论一下我在代码中看到的主要问题:

您将 done 变量传递给最后一个 ProcessLead 调用,您又在 ProcessLead 中使用该变量通过 停止您的工作人员退出 channel 。问题在于,“最后”ProcessLead 调用可能会在其他 ProcessLead 调用之前完成,因为它们是并行执行的。

第一次改进

将您的问题视为管道。您有 3 个步骤:

  1. 检查所有线索并为每个线索启动例程
  2. 程序处理他们的领导
  3. 收集结果

在第 2 步展开后,最简单的同步方法是 WaitGroup。正如已经提到的,您没有调用同步,如果您调用,您当前会创建一个与您的收集例程相关的死锁。您需要另一个 goroutine 将同步与收集例程分开才能工作。

它看起来怎么样(很抱歉删除了一些代码,这样我可以更好地看到结构):

//BackFillParallel . . .
func BackFillParallel(leads []Lead, verbose bool) {
gl, bl, d := make(chan LeadRes), make(chan LeadResErr), make(chan int)
// additional goroutine with wg.Wait() and closing the quit channel
go func(d chan int) {
var wg sync.WaitGroup
for i, lead := range leads {
wg.Add(1)
go func(lead Lead, wg *sync.WaitGroup) {
ProcessLead(lead, gl, bl, wg)
}(lead, &wg)
}
wg.Wait()
// stop routine after all other routines are done
// if your channels have buffers you might want make sure there is nothing in the buffer before closing
close(d) // you can simply close a quit channel. just make sure to only close it once
}(d)

// now startworker is running parallel to wg.Wait() and close(d)
startWorker(gl, bl, d, verbose)
}

func startWorker(ok chan LeadRes, err chan LeadResErr, quit chan int, verbose bool) {
for {
select {
case lead := <-ok:
fmt.Println(lead)
case err := <-err:
fmt.Println(err)
case <-quit:
return
}
}
}

//ProcessLead . . .
func ProcessLead(lead Lead, c1 chan LeadRes, c2 chan LeadResErr, wg *sync.WaitGroup) {
defer wg.Done()
var payloads []Payload
for _, p := range lead.Payload {
decMDStr, err := base64.StdEncoding.DecodeString(p.MetaData)
if err != nil {
c2 <- LeadResErr{lead, err.Error()}
}
var decMetadata Metadata
if err := json.Unmarshal(decMDStr, &decMetadata); err != nil {
goodMetadata, err := FixMDStr(string(decMDStr))
if err != nil {
c2 <- LeadResErr{lead, err.Error()}
}
p.MetaData = goodMetadata

payloads = append(payloads, p)
}
}

lead.Payload = payloads
c1 <- LeadRes{lead}
}

建议的解决方案

如评论中所述,如果您有缓冲 channel ,您可能会遇到麻烦。复杂性来自您拥有的两个输出 channel (用于 Lead 和 LeadErr)。您可以使用以下结构避免这种情况:

//BackFillParallel . . .
func BackFillParallel(leads []Lead, verbose bool) {
gl, bl := make(chan LeadRes), make(chan LeadResErr)

// one goroutine that blocks until all ProcessLead functions are done
go func(gl chan LeadRes, bl chan LeadResErr) {
var wg sync.WaitGroup
for _, lead := range leads {
wg.Add(1)
go func(lead Lead, wg *sync.WaitGroup) {
ProcessLead(lead, gl, bl, wg)
}(lead, &wg)
}
wg.Wait()
}(gl, bl)

// main routine blocks until all results and errors are collected
var wg sync.WaitGroup
res, errs := []LeadRes{}, []LeadResErr{}
wg.Add(2) // add 2 for resCollector and errCollector
go resCollector(&wg, gl, res)
go errCollector(&wg, bl, errs)
wg.Wait()

fmt.Println(res, errs) // in these two variables you will have the results.
}

func resCollector(wg *sync.WaitGroup, ok chan LeadRes, res []LeadRes) {
defer wg.Done()
for lead := range ok {
res = append(res, lead)
}
}

func errCollector(wg *sync.WaitGroup, ok chan LeadResErr, res []LeadResErr) {
defer wg.Done()
for err := range ok {
res = append(res, err)
}
}

// ProcessLead function as in "First improvement"

关于multithreading - Goroutines, channels select 语句,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45271129/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com