gpt4 book ai didi

go - 如何使用 goroutines 接收数据并发回信号

转载 作者:行者123 更新时间:2023-12-04 15:03:11 28 4
gpt4 key购买 nike

我想利用 Go 中的并发性 将数据发送到 goroutine 以使用 channel 进行处理和计算。数据点一个接一个地出现在一个函数中,这个函数可以是 main 函数,也可以是某个 sendData 函数。 如果可能,我希望从主函数发送数据。

我想将数据从发送函数发送到一个 goroutine,其中数据存储在一个 slice 中(我们称这个 goroutine getData)。某些计算是在这个 slice 上完成的。 在达到某个条件(取决于 slice )后,我希望 goroutine 向 sendData 函数发送信号,表明对某批数据点的处理已完成。 并且现在,sendData 函数不断通过 channel 将数据点发送到 getData goroutine,在 goroutine 中不断构建新的 slice ,当达到条件时发送信号 - 即处理完成并且整个序列不断重复。

举个例子,假设数字形式的数据正在从 sendData 发送到 getData。条件是 getData 接收到的数字的运行平均值应等于 4。让我们将以下数字序列作为我们的数据 - []int{3, 2, 3, 8 , 2, 1, 1, 1, 15}。在这里,第一批数字将是 {3, 2, 3, 8},因为在将这些数字按此顺序发送到 getData 后,它会发现运行平均值在 getData 接收到数字 8 后,数字的数量等于 4。然后它向 sendData 发送一个信号。发送数据的过程再次开始,下一批是 {2, 1, 1, 1, 15}。此处,在 getData 收到数字 15 后,它发现运行平均值等于 4,再次向 sendData 发送信号。 ( 这是一个非常基本的示例 - 在我的实际用例中,输入数据和条件更为复杂。我有将在 sendData 中实时读取的数据。这里每个数据点是按顺序读取,但每个数据点在前一个数据点之后几微秒到达。因此,数据的到达速度很快,我不想在这个函数中做太多的处理和计算。此外,我想保持并发完好,因为在读取数据的函数中,数据到达的速度很快。而且,我不希望因为在完成处理的goroutine中处理数据而错过这里的数据读取。 )

以下是我尝试构建代码的方式:

package main

import (
"fmt"
)

func main() {
ch := make(chan int)
go sendData(ch)
go getData(ch)
}

func sendData(ch chan int) {
syntheticData := []int{3, 2, 3, 8, 2, 1, 1, 1, 15}
for _, data := range syntheticData {
ch <- data
}
}

func getData(ch chan int) {
dataArr := []int{}
dataArr = append( dataArr, <-ch )
fmt.Println(dataArr)

if mean(dataArr) == 4{
close(ch)
}

}

func sum(array []int) int {
var result int = 0
for _, v := range array {
result += v
}

return result
}


func mean(array []int) float64 {
sumArr := float64(sum(array)) / float64(len(array))
return sumArr
}

我没有用上面的代码实现我想要的功能。 如何在 Go 中实现所需的功能?

最佳答案

你只需要一个extera接收goroutine,例如getData 然后 main goroutine 将在数据到达时使用名为 ch 的 channel 发送数据,您需要一个缓冲 信号 channel ,例如batchCompleted,还有一个 WaitGroup 等待 getData 同步,当它完成>.
就是这样,试试it :

package main

import (
"fmt"
"sync"
)

func main() {
wg := &sync.WaitGroup{}
ch := make(chan int)
batchCompleted := make(chan struct{}, 1) // non-blocking signaling channel
wg.Add(1)
go getData(ch, batchCompleted, wg)

syntheticData := []int{3, 2, 3, 8, 2, 1, 1, 1, 15}
i := 0
check := func() {
select {
case <-batchCompleted:
i++
fmt.Println(i, " batch completed")
default:
}
}
for _, data := range syntheticData {
ch <- data
check()
}
close(ch)
wg.Wait()
check()
}

func getData(ch chan int, batchCompleted chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
a := []int{}
sum, n := 0, 0
for v := range ch {
sum += v
n++
a = append(a, v)
if sum == 4*n {
batchCompleted <- struct{}{}
fmt.Println(a)
sum, n = 0, 0
a = a[:0]
}
}
if len(a) > 0 {
fmt.Println("remaining data:", a)
}
}

输出:

[3 2 3 8]
1 batch completed
[2 1 1 1 15]
2 batch completed

关于go - 如何使用 goroutines 接收数据并发回信号,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66624680/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com