gpt4 book ai didi

go - "fan in"- 一种 "fan out"行为

转载 作者:IT王子 更新时间:2023-10-29 00:45:22 24 4
gpt4 key购买 nike

比如说,我们有三种方法来实现“扇入”行为

func MakeChannel(tries int) chan int {
ch := make(chan int)

go func() {
for i := 0; i < tries; i++ {
ch <- i
}
close(ch)
}()

return ch
}

func MergeByReflection(channels ...chan int) chan int {
length := len(channels)
out := make(chan int)
cases := make([]reflect.SelectCase, length)
for i, ch := range channels {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
go func() {
for length > 0 {
i, line, opened := reflect.Select(cases)
if !opened {
cases[i].Chan = reflect.ValueOf(nil)
length -= 1
} else {
out <- int(line.Int())
}
}
close(out)
}()
return out
}

func MergeByCode(channels ...chan int) chan int {
length := len(channels)
out := make(chan int)
go func() {
var i int
var ok bool

for length > 0 {
select {
case i, ok = <-channels[0]:
out <- i
if !ok {
channels[0] = nil
length -= 1
}
case i, ok = <-channels[1]:
out <- i
if !ok {
channels[1] = nil
length -= 1
}
case i, ok = <-channels[2]:
out <- i
if !ok {
channels[2] = nil
length -= 1
}
case i, ok = <-channels[3]:
out <- i
if !ok {
channels[3] = nil
length -= 1
}
case i, ok = <-channels[4]:
out <- i
if !ok {
channels[4] = nil
length -= 1
}
}
}
close(out)
}()
return out
}

func MergeByGoRoutines(channels ...chan int) chan int {
var group sync.WaitGroup

out := make(chan int)
for _, ch := range channels {
go func(ch chan int) {
for i := range ch {
out <- i
}
group.Done()
}(ch)
}
group.Add(len(channels))
go func() {
group.Wait()
close(out)
}()
return out
}

type MergeFn func(...chan int) chan int

func main() {
length := 5
tries := 1000000
channels := make([]chan int, length)
fns := []MergeFn{MergeByReflection, MergeByCode, MergeByGoRoutines}

for _, fn := range fns {
sum := 0
t := time.Now()
for i := 0; i < length; i++ {
channels[i] = MakeChannel(tries)
}
for i := range fn(channels...) {
sum += i
}
fmt.Println(time.Since(t))
fmt.Println(sum)
}
}

结果是(在 1 个 CPU 下,我使用了 runtime.GOMAXPROCS(1)):
19.869s(通过反射合并)
2499997500000
8.483s (MergeByCode)
2499997500000
4.977s(MergeByGoRoutines)
2499997500000

结果是(在 2 个 CPU 下,我使用了 runtime.GOMAXPROCS(2)):
44.94s
2499997500000
10.853s
2499997500000
3.728s
2499997500000

  • 我明白 MergeByReflection 最慢的原因,但是 MergeByCode 和 MergeByGoRoutines 之间的区别是什么?
  • 当我们增加 CPU 数量时,为什么“select”子句(直接使用 MergeByReflection 并在 MergeByCode 中间接使用)变得更慢?

最佳答案

这是一个初步的评论。您示例中的 channel 都是无缓冲的,这意味着它们可能会在放置或获取时间时阻塞。

在这个例子中,除了 channel 管理,几乎没有其他处理。因此,性能主要由同步原语决定。实际上,可以并行化的代码非常少。

在 MergeByReflection 和 MergeByCode 函数中,select 用于监听多个输入 channel ,但没有采取任何措施来考虑输出 channel (因此可能会阻塞,而某些事件可能在其中一个输入 channel 上可用) .

在 MergeByGoRoutines 函数中,这种情况不会发生:当输出 channel 阻塞时,它不会阻止另一个输入 channel 被另一个 goroutine 读取。因此,运行时有更好的机会并行化 goroutine,并减少对输入 channel 的争用。

MergeByReflection 代码是最慢的,因为它有反射的开销,几乎没有什么可以并行化。

MergeByGoRoutines 函数是最快的,因为它减少了争用(需要较少的同步),并且因为输出争用对输入性能的影响较小。因此,在使用多核运行时(与其他两种方法相反),它可以从小的改进中获益。

MergeByReflection 和 MergeByCode 的同步事件太多,在多个内核上运行会对性能产生负面影响。不过,您可以通过使用缓冲 channel 获得不同的性能。

关于go - "fan in"- 一种 "fan out"行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29827896/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com