- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
假设我们想并行处理一些计算,但我们必须保证结果的顺序与计算的顺序相同:
这可以通过例如:
https://play.golang.org/p/jQbo0EVLzvX
package main
import (
"fmt"
"time"
)
func main() {
orderPutChans := make([]chan bool, 8)
orderGetChans := make([]chan bool, 8)
doneChans := make([]chan bool, 8)
for i := 0; i < 8; i++ {
orderPutChans[i] = make(chan bool, 1)
orderGetChans[i] = make(chan bool)
doneChans[i] = make(chan bool)
}
srcCh := make(chan int)
dstCh := make(chan int)
for i := 0; i < 8; i++ {
go func(j int) {
myGetCh := orderGetChans[j]
nextGetCh := orderGetChans[(j+1) % 8]
myPutCh := orderPutChans[j]
nextPutCh := orderPutChans[(j+1) % 8]
for {
_ = <- myGetCh
v, ok := <- srcCh
if !ok {
k := (j + 1) % 8
if orderGetChans[k] != nil {
orderGetChans[k] <- true
}
orderGetChans[j] = nil
break
}
nextGetCh <- true
time.Sleep(1000)
v *= v
_ = <- myPutCh
dstCh <- v
nextPutCh <- true
}
doneChans[j] <- true
}(i)
}
go func() {
for i := 0; i < 8; i++ {
_ = <- doneChans[i]
}
close(dstCh)
}()
orderGetChans[0] <- true
orderPutChans[0] <- true
go func() {
for i := 0; i < 100; i++ {
srcCh <- i
}
close(srcCh)
}()
for vv := range dstCh {
fmt.Println(vv)
}
}
可以使用 channel 传递 channel 的读/写权限。代码比较乱,看起来不是很整洁。 Go 中是否有更简洁的方法来实现这一目标?
编辑:我不是在要求“简单”的替换,例如使用 chan struct{}
或使用 close
在 doneChans
赞成doneChans[i] <- true
.
编辑2:
一个更简单的方法(至少就代码而言)是使用 results
数组,消费者将数据与索引(这将是 worker 的模数)一起发送,goroutines 将结果写入 results[j]
然后让 WaitGroup 等到所有操作都完成(使用一批中的一批),然后遍历结果并将它们发送到目标 channel 。 (可能是因为虚假分享,效果不是很好?)
最佳答案
如果我理解正确,这是您使用“管道”样式的代码版本。管道中有多个步骤:
这是代码,它使用您在对原始问题的编辑中提到的索引对样式。
type idxPair struct {
idx, val int
}
func main() {
// add a done channel, an ability to stop the world by closing this.
done := make(chan struct{})
defer close(done)
// create srcChan, this will be where the values go into the pipeline
srcCh := make(chan idxPair)
// create a slice of result channels, one for each of the go workers
const numWorkers = 8
resChans := make([]<-chan idxPair, numWorkers)
// waitgroup to wait for all the workers to stop
var wg sync.WaitGroup
wg.Add(numWorkers)
// start the workers, passing them each the src channel,
// collecting the result channels they return
for i := 0; i < numWorkers; i++ {
resChans[i] = worker(done, &wg, srcCh)
}
// start a single goroutine to send values into the pipeline
// all values are sent with an index, to be pieces back into order at the end.
go func() {
defer close(srcCh)
for i := 1; i < 100; i++ {
srcCh <- idxPair{idx: i, val: i}
}
}()
// merge all the results channels into a single results channel
// this channel is unordered.
mergedCh := merge(done, resChans...)
// order the values coming from the mergedCh according the the idxPair.idx field.
orderedResults := order(100, mergedCh)
// iterate over each of the ordered results
for _, v := range orderedResults {
fmt.Println(v)
}
}
func order(len int, res <-chan idxPair) []int {
results := make([]int, len)
// collect all the values to order them
for r := range res {
results[r.idx] = r.val
}
return results
}
func worker(done <- chan struct{}, wg *sync.WaitGroup, src <-chan idxPair) <-chan idxPair {
res := make(chan idxPair)
go func() {
defer wg.Done()
defer close(res)
sendValue := func(pair idxPair) {
v := pair.val
v *= v
ip := idxPair{idx: pair.idx, val: v}
select {
case res <- ip:
case <-done:
}
}
for v := range src{
sendValue(v)
}
}()
return res
}
// example and explanation here: https://blog.golang.org/pipelines
func merge(done <-chan struct{}, cs ...<-chan idxPair) <-chan idxPair {
var wg sync.WaitGroup
out := make(chan idxPair)
output := func(c <-chan idxPair) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
我认为这稍微干净一些,而不仅仅是“为了它而不同”的原因是:
order
阶段可以轻松优化,以便在收到值时通过 channel 发送值等。关于go - 序列化 goroutines(并行化但保证顺序),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51157337/
发件人:http://blog.nindalf.com/how-goroutines-work/ As the goroutines are scheduled cooperatively, a go
很多时候在用 Go 开发 http 服务器时,我都会遇到这种困境。 假设我想尽快用http statuscode 200响应客户端(然后在后面执行工作),这就是我通常这样做的原因: 我让我的主要 ht
这是代码: import "fmt" func main() { messages := make(chan string, 1) go func(c chan string) {
我正在学习 Golang,但遇到了一些困难。我已经研究过 Google,但没有任何进展。 我编写了一个代码,通过多台服务器的 ICMP 检查 RTT。 它有这样的结构: type Server str
我想运行多个 goroutine,进行一些处理,将结果放入 channel ,当至少有一个 goroutine 完成时,完成所有其他 goroutine 并从 channel 返回结果。 所以,我尝试
我有两个(但以后我会是三个)go 例程来处理来自远程服务器(来自 ampq channel )的传入消息。但是因为它们正在处理相同的数据/状态,所以我想阻止所有其他 go 例程,除了正在运行的例程。
我有一个案例,我从 2 个不同的位置(ES 和 REDIS)读取数据,我需要从最快的源读取一个值,因此我触发了 2 个 goroutines,一个从 ES 获取数据,其他从REDIS获取。 一旦从其中
像这里一样,我创建了一个 go playground 示例:sGgxEh40ev ,但无法正常工作。 quit := make(chan bool) res := make(chan int) go
我是golang的新手,正在研究goroutine。 我写了一个简单的代码,故意使用 goroutine 来划分数字。 首先,我给出基数并继续除它的数,直到它不能被整除 但是,我改变了go split
Main { go routine_1(carryout a time consuming task and return output) go routine_2(wait for output f
我想知道从另一个 goroutine 返回时调用的 goroutine 会发生什么。他们是继续运行还是被终止?这是一个示例代码来说明我的意思: func func() { // Doing s
更具体地说,在我的例子中,我有一个网络服务器和一个全局可访问的结构,网络服务器使用它来生成页面。我有另一个 Goroutine,它总是定期用新值更新该结构。这会引起问题吗?我是否需要实现一种机制来确保
来自 this file ,我不明白为什么函数startWorker会这样写: func (p *WorkerPool) dispatch() { for i := 0; i < p.maxW
我正在学习围棋,但在使用 goroutines 时遇到了问题。这是我的代码 package main import ( "fmt" "sync" "time" ) var co
我收到以下错误,我不明白为什么: 发送:查询 Herefatal 错误:所有 goroutines 都睡着了 - 死锁! 您可以看到我正在调用我使用 goroutine 创建的函数 routine。我
大家好,我正在从 Python3 过渡到 Go,所以我正在尝试重写我创建的库以获得更好的性能。 我面临一个问题,因为我是 Golang XD 中的新手,我使用有限的 API 下载数百个 json,我想
我有以下格式的脚本部分: func main() { for i=0;i<1000000;i++ { go test() } } func test() { a := test
package main func main() { c:=make(chan int) for i:=0; i<=100;i++ {
我正在学习 Go,我的第一个项目是一个简单的 ping 脚本。本质上,我想 ping 一堆 url,并在每个响应时等待 XXX 秒,然后再次 ping。这是删减的代码: func mai
这个问题在这里已经有了答案: Go all goroutines are asleep deadlock (2 个回答) fatal error: all goroutines are asleep
我是一名优秀的程序员,十分优秀!