gpt4 book ai didi

Go 例程以 for-loop 开始——一个还是多个 channel ?

转载 作者:IT王子 更新时间:2023-10-29 01:43:26 25 4
gpt4 key购买 nike

我想使用从 for 循环调用的 goroutine 加载一些 json 文件(“.json”)。我想并行加载(在加载其他文件的同时处理第一个文件)。

Q1。由于文件的数量可能会有所不同(要添加新文件),我会使用带有文件名的(文件)列表(仅在此示例中自动生成名称),因此我想使用 for 循环。最优?

第二季度。什么是最有效的 channel 使用方式。

第 3 季度。如果每个加载操作都需要一个唯一的 channel (如下面的示例代码所示),我将如何定义 channel ?

示例代码(要压缩并能够使用文件名列表加载文件):


func load_json(aChan chan byte, s string) {
// load "filename" + s + ".json"
// confirm to the channel
aChan <- 0
}

func do_stuff() {
// .. with the newly loaded json
}

func Main() {
chan_A := make(chan byte)
go load_json(chan_A, "_classA")

chan_B := make(chan byte)
go load_json(chan_B, "_classB")

chan_C := make(chan byte)
go load_json(chan_C, "_classC")

chan_D := make(chan byte)
go load_json(chan_D, "_classD")


<-chan_A
// Now, do stuff with Class A
<-chan_B
// etc...
<-chan_C
<-chan_D
fmt.Println("Done.")
}

编辑:我根据“汤姆”(见下文)建议的想法设计了一个简化的测试方案。在我的例子中,我将任务分为三个阶段,每个阶段使用一个 channel 来控制执行。但是,我往往会遇到此代码的死锁(请参阅执行结果和代码下方的注释)。

PlayGround 上运行此代码.

如何避免这段代码中的死锁?:

type TJsonFileInfo struct {
FileName string
}
type TChannelTracer struct { // Will count & display visited phases A, B, C
A, B, C int
}
var ChannelTracer TChannelTracer

var jsonFileList = []string{
"./files/classA.json",
"./files/classB.json",
"./files/classC.json",
}

func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
var newFileInfo TJsonFileInfo
newFileInfo.FileName = aFileName
// file, e := ioutil.ReadFile(newFileInfo.FileName)...
ChannelTracer.A += 1
fmt.Printf("A. Loaded file: %s\n", newFileInfo.FileName)
aResultQueueChan <- &newFileInfo
}

func UnmarshalFile(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
FileInfo := <-aWorkQueueChan
ChannelTracer.B += 1
fmt.Printf("B. Marshalled file: %s\n", FileInfo.FileName)
aResultQueueChan <- FileInfo
}

func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
FileInfo := <-aWorkQueueChan
ChannelTracer.C += 1
fmt.Printf("C. Processed file: %s \n", FileInfo.FileName)
aDoneQueueChan <- FileInfo
}

func main() {
marshalChan := make(chan *TJsonFileInfo)
processChan := make(chan *TJsonFileInfo)
doneProcessingChan := make(chan *TJsonFileInfo)

for _, fileName := range jsonFileList {
go LoadJsonFiles(fileName, marshalChan)
go UnmarshalFile(marshalChan, processChan)
go ProcessWork(processChan, doneProcessingChan)
}

for {
select {
case result := <-marshalChan:
result.FileName = result.FileName // dummy use
case result := <-processChan:
result.FileName = result.FileName // dummy use
case result := <-doneProcessingChan:
result.FileName = result.FileName // dummy use
fmt.Printf("Done%s Channels visited: %v\n", ".", ChannelTracer)
}
}
}

/**
RESULTS (for phases A, B and C):

A. Loaded file: ./files/classA.json
A. Loaded file: ./files/classB.json
A. Loaded file: ./files/classC.json
B. Marshalled file: ./files/classB.json
B. Marshalled file: ./files/classC.json
C. Processed file: ./files/classB.json
C. Processed file: ./files/classC.json
Done. Channels visited: {3 2 2} // ChannelTracer for phase A, B and C
Done. Channels visited: {3 2 2}
fatal error: all goroutines are asleep - deadlock!
*/

注意此代码不访问文件系统,因此它应该在 PlayGround 上运行

EDIT2:- 除了不安全的“ChannelTracer”,我只能通过使用与文件任务相同次数的 doneProcessingChannel 来避免死锁。
在此处运行代码: Playground

func main() {
marshalChan := make(chan *TJsonFileInfo)
processChan := make(chan *TJsonFileInfo)
doneProcessingChan := make(chan *TJsonFileInfo)

go UnmarshalFiles(marshalChan, processChan)
go ProcessWork(processChan, doneProcessingChan)

for _, fileName := range jsonFileList {
go LoadJsonFiles(fileName, marshalChan)
}

// Read doneProcessingChan equal number of times
// as the spawned tasks (files) above :
for i := 0; i < len(jsonFileList); i++ {
<-doneProcessingChan
fmt.Printf("Done%s Channels visited: %v\n", ".", ChannelTracer)
}
}

//里尔

最佳答案

建立在 answer 上通过 @BraveNewCurrency我编写了一个简单的示例 program给你:

package main

import (
"encoding/json"
"fmt"
"os"
)

type Result struct {
Some string
Another string
AndAn int
}

func generateWork(work chan *os.File) {
files := []string{
"/home/foo/a.json",
"/home/foo/b.json",
"/home/foo/c.json",
}
for _, path := range files {
file, e := os.Open(path)
if e != nil {
panic(e)
}
work <- file
}
}

func processWork(work chan *os.File, done chan Result) {
file := <-work
decoder := json.NewDecoder(file)
result := Result{}
decoder.Decode(&result)
done <- result
}

func main() {
work := make(chan *os.File)
go generateWork(work)
done := make(chan Result)
for i := 0; i < 100; i++ {
go processWork(work, done)
}
for {
select {
case result := <-done:
// a result is available
fmt.Println(result)
}
}
}

请注意,该程序无法在 playground 上运行,因为那里不允许访问文件系统。

编辑:

为了回答您问题中的版本,我获取了代码和 changed some small things :

package main

import (
_ "encoding/json"
"fmt"
_ "io/ioutil"
_ "os"
)

type TJsonMetaInfo struct {
MetaSystem string
}

type TJsonFileInfo struct {
FileName string
}

type TChannelTracer struct { // Will count & display visited phases A, B, C
A, B, C int
}

var ChannelTracer TChannelTracer

var jsonFileList = []string{
"./files/classA.json",
"./files/classB.json",
"./files/classC.json",
}

func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
newFileInfo := TJsonFileInfo{aFileName}
// file, e := ioutil.ReadFile(newFileInfo.FileName)
// etc...
ChannelTracer.A += 1
fmt.Printf("A. Loaded file: %s\n", newFileInfo.FileName)
aResultQueueChan <- &newFileInfo
}

func UnmarshalFiles(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
for {
FileInfo := <-aWorkQueueChan
ChannelTracer.B += 1
fmt.Printf("B. Unmarshalled file: %s\n", FileInfo.FileName)
aResultQueueChan <- FileInfo
}
}

func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
for {
FileInfo := <-aWorkQueueChan
ChannelTracer.C += 1
fmt.Printf("C. Processed file: %s \n", FileInfo.FileName)
aDoneQueueChan <- FileInfo

}
}

func main() {
marshalChan := make(chan *TJsonFileInfo)
processChan := make(chan *TJsonFileInfo)
doneProcessingChan := make(chan *TJsonFileInfo)

go UnmarshalFiles(marshalChan, processChan)
go ProcessWork(processChan, doneProcessingChan)

for _, fileName := range jsonFileList {
go LoadJsonFiles(fileName, marshalChan)
}

for {
select {
case result := <-doneProcessingChan:
result.FileName = result.FileName // dummy use
fmt.Printf("Done%s Channels visited: %v\n", ".", ChannelTracer)
}
}
}

请注意,这段代码仍然死锁,但最后,当所有工作都完成时,在 main() 的最后一个空 for 循环中。

还要注意这些行:

ChannelTracer.A += 1
ChannelTracer.B += 1
ChannelTracer.C += 1

不是并发安全的。这意味着在多线程环境中,一个 goroutine 和另一个 goroutine 可能会尝试同时递增同一个计数器,从而导致错误的计数。要解决此问题,请查看以下软件包:

关于Go 例程以 for-loop 开始——一个还是多个 channel ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16873000/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com