gpt4 book ai didi

go - 实现 worker 功能的管道

转载 作者:数据小太阳 更新时间:2023-10-29 03:21:38 24 4
gpt4 key购买 nike

我正在实现一个由多个通过 channel 连接的工作函数组成的管道。它们都得到 (in, out chan interface{}) 作为输入(每个函数接收前一个函数的 out 作为 in)

我不能保证 out 会在每个函数结束时关闭,所以我想知道我应该如何检查前一个函数是否完成了它工作。我从这样的事情开始:

func ExecutePipeline(jobs ...job) {
out := make(chan interface{}, 10)
for _, val := range jobs {
in := out
out := make(chan interface{})
go val(in, out)
}
}

我正在考虑以某种方式使用 WaitGroup 来使用函数的 goroutine 的结尾作为它已完成其工作并关闭其输出 channel 的指示器。
我该怎么做?

最佳答案

如果您的目的是沿着管道传播信号以在先前的管道阶段完成并且不会产生进一步的值时进行通信,您可以通过在每个管道阶段返回后关闭 channel 来同步执行此操作。以下代码通过包装每个管道 worker 的调用来实现:

func startWork(val job, in, out chan interface{}) {
val(in, out)
// out will be closed after val returns
close(out)
}

// Later, in ExecutePipeline, start your worker by calling startWork
func ExecutePipeline(jobs ...job) {
// ...
for _, val := range jobs {
// ...
go startWork(val, in, out)
}
}

避免多个 channel 关闭

I don't have any guarantee that out will be closed at the end of each function

相反,如果任何工作人员可以关闭一个 channel ,这是有问题的;如果您尝试关闭一个已经关闭的 channel ,随后在 startWork 中关闭 channel 的调用将会出现困惑。

在这个简单的实现中,工作人员必须将 channel 关闭委托(delegate)给监督管道的代码,以避免导致程序崩溃。


处理带内信号

由于信号是带内传递的(与数据在同一 channel 中),因此在实现管道工作人员时可能需要小心,以确保它们区分

  • 从开放 channel 读取值,以及
  • 从关闭的 channel 读取零值

rangefor 循环中遍历 channel 将在 channel 关闭时自动中断循环。如果您实现自己的逻辑以从 channel 中读取数据,您将需要确定读取何时因 channel 关闭而以零值简单地成功。这可以使用 multi-valued assignment form of the receive operator 来实现,当由于 channel 关闭且为空而从 channel 读取的值为零时,它将返回一个 bool 值。

func someWorker(in, out chan interface{}) {
for {
val, open := <-in
if !open {
// Read of val was the zero value of "in", indicating the channel
// is closed.
break // or, if appropriate, return
}
}
}

关于go - 实现 worker 功能的管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52802734/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com