gpt4 book ai didi

asynchronous - 多个函数的数据处理是异步的

转载 作者:数据小太阳 更新时间:2023-10-29 03:24:14 26 4
gpt4 key购买 nike

我有通过 http 接收的数据,这些数据需要由两个不同的函数处理。重要的是它们由每个函数按顺序处理。在文件中,例如:1,2,3,4,5。而数据库也记录了1,2,3,4,5。作为 fifo 模型。现在我有这样一个问题......我的数据一直在运行,有时数据库可以满足我更新数据的要求很长时间,因此我无法及时更新文件。在可能的情况下将数据添加到文件或数据库对我来说很重要。我可以使用缓冲 channel ,但我不知道队列中有多少数据可以等待处理,我不想表明缓冲区的大小肯定很大。我尝试向 NewData 函数添​​加更多 goroutine,但在那种情况下,我的数据不是按顺序写入的。

此代码显示了问题。

package main

import (
"fmt"
"time"
)

type procHandler interface {
Start()
NewData(newdata []byte)
}

type fileWriter struct {
Data chan []byte
}

func (proc *fileWriter) Start() {
proc.Data = make(chan []byte)
go func() {
for {
obj := <-proc.Data

fmt.Printf("proc %T ", proc)
fmt.Println(obj)
}
}()
}

func (proc *fileWriter) NewData(newdata []byte) {
proc.Data <- newdata
}

type sqlWriter struct {
Data chan []byte
}

func (proc *sqlWriter) Start() {
proc.Data = make(chan []byte)
go func() {
for {
obj := <-proc.Data
time.Sleep(5 * time.Second)
fmt.Printf("proc %T ", proc)
fmt.Println(obj)
}
}()
}

func (proc *sqlWriter) NewData(newdata []byte) {
proc.Data <- newdata
}

var processors = []procHandler{}

func receiver() {
newDataImitateByteRange := 30
for i := 0; i < newDataImitateByteRange; i++ {
pseudoData := []byte{byte(i)}

for _, handler := range processors {
handler.NewData(pseudoData)
}
}
}

func main() {
// file writer
fileUpdate := &fileWriter{}
processors = append(processors, fileUpdate)

// sql writer
sqlUpdate := &sqlWriter{}
processors = append(processors, sqlUpdate)

sqlUpdate.Start()
fileUpdate.Start()

go receiver()

fmt.Scanln()
}

代码有效:https://play.golang.org/p/rSshsJYZ4h

输出:

proc *main.fileWriter [0]
proc *main.fileWriter [1]
proc *main.sqlWriter [0] (sleep)
proc *main.fileWriter [2] (Display after 5 seconds when the previous channel is processed)
proc *main.sqlWriter [1] (sleep)
proc *main.fileWriter [3] (Display after 5 seconds when the previous channel is processed)
proc *main.sqlWriter [2]
proc *main.fileWriter [4]
proc *main.sqlWriter [3]
proc *main.fileWriter [5]
proc *main.sqlWriter [4]
proc *main.fileWriter [6]

我要:

proc *main.fileWriter [0]
proc *main.fileWriter [1]
proc *main.fileWriter [2]
proc *main.fileWriter [3]
proc *main.fileWriter [4]
proc *main.fileWriter [5]
proc *main.fileWriter [6]
proc *main.sqlWriter [0] (after 5 seconds passed the handler started execution.)
proc *main.sqlWriter [1] (sleep)
proc *main.sqlWriter [2] (sleep)
proc *main.sqlWriter [3] (sleep)
proc *main.sqlWriter [4] (sleep)
proc *main.sqlWriter [5] (sleep)
proc *main.sqlWriter [6] (sleep)

希望得到帮助,谢谢!

最佳答案

听起来您正在寻找的东西就像一个 channel ,可以根据排队的数据调整大小(增长或缩小)。这可以通过在输入和输出 channel 之间设置一个队列来实现,并使用 goroutine 为这些 channel 提供服务。这是这样一个解决方案: https://github.com/gammazero/bigchan#bigchan

我在您的 fileWritersqlWriter 中使用了 BigChan 作为数据通道,它似乎有您正在寻找的结果。以下是您修改后的代码:

package main

import (
"fmt"
"time"

"github.com/gammazero/bigchan"
)

// Maximum number of items to buffer. set to -1 for unlimited.
const limit = 65536

type procHandler interface {
Start()
NewData(newdata []byte)
}

type fileWriter struct {
Data *bigchan.BigChan
}

func (proc *fileWriter) Start() {
proc.Data = bigchan.New(limit)
go func() {
for {
_obj := <-proc.Data.Out()
obj := _obj.([]byte)

fmt.Printf("proc %T ", proc)
fmt.Println(obj)
}
}()
}

func (proc *fileWriter) NewData(newdata []byte) {
proc.Data.In() <- newdata
}

type sqlWriter struct {
Data *bigchan.BigChan
}

func (proc *sqlWriter) Start() {
proc.Data = bigchan.New(limit)

go func() {
for {
_obj := <-proc.Data.Out()
obj := _obj.([]byte)
time.Sleep(5 * time.Second)
fmt.Printf("proc %T ", proc)
fmt.Println(obj)
}
}()
}
func (proc *sqlWriter) NewData(newdata []byte) {
proc.Data.In() <- newdata
}

var processors = []procHandler{}

func receiver() {
newDataImitateByteRange := 30
for i := 0; i < newDataImitateByteRange; i++ {
pseudoData := []byte{byte(i)}

for _, handler := range processors {
handler.NewData(pseudoData)
}
}
}

func main() {
// file writer
fileUpdate := &fileWriter{}
processors = append(processors, fileUpdate)

// sql writer
sqlUpdate := &sqlWriter{}
processors = append(processors, sqlUpdate)

sqlUpdate.Start()
fileUpdate.Start()

go receiver()

fmt.Scanln()
}

关于asynchronous - 多个函数的数据处理是异步的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46519440/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com