gpt4 book ai didi

multithreading - 如何等待执行

转载 作者:IT王子 更新时间:2023-10-29 02:33:44 26 4
gpt4 key购买 nike

我有一个您想要并行分析的大型日志文件。

我有这个代码:

package main

import (
"bufio"
"fmt"
"os"
"time"
)

func main() {
filename := "log.txt"
threads := 10

// Read the file
file, err := os.Open(filename)
if err != nil {
fmt.Println("Could not open file with the database.")
os.Exit(1)
}
defer file.Close()

scanner := bufio.NewScanner(file)

// Channel for strings
tasks := make(chan string)

// Run the threads that catch events from the channel and understand one line of the log file
for i := 0; i < threads; i++ {
go parseStrings(tasks)
}

// Start a thread load lines from a file into the channel
go getStrings(scanner, tasks)

// At this point I have to wait until all of the threads executed
// For example, I set the sleep
for {
time.Sleep(1 * time.Second)
}
}

func getStrings(scanner *bufio.Scanner, tasks chan<- string) {
for scanner.Scan() {
s := scanner.Text()
tasks <- s
}
}

func parseStrings(tasks <-chan string) {
for {
s := <-tasks
event := parseLine(s)
fmt.Println(event)
}
}

func parseLine(line string) string {
return line
}

实际上,正如我等待所有线程结束?我被建议创建一个单独的线程,我将在其中添加结果,但是如何添加?

最佳答案

使用管道模式和“扇出/扇入”模式:

package main

import (
"bufio"
"fmt"
"strings"
"sync"
)

func main() {
file := "here is first line\n" +
"here is second line\n" +
"here is line 3\n" +
"here is line 4\n" +
"here is line 5\n" +
"here is line 6\n" +
"here is line 7\n"
scanner := bufio.NewScanner(strings.NewReader(file))

// all lines onto one channel
in := getStrings(scanner)

// FAN OUT
// Multiple functions reading from the same channel until that channel is closed
// Distribute work across multiple functions (ten goroutines) that all read from in.
xc := fanOut(in, 10)

// FAN IN
// multiplex multiple channels onto a single channel
// merge the channels from c0 through c9 onto a single channel
for n := range merge(xc) {
fmt.Println(n)
}
}

func getStrings(scanner *bufio.Scanner) <-chan string {
out := make(chan string)
go func() {
for scanner.Scan() {
out <- scanner.Text()
}
close(out)
}()
return out
}

func fanOut(in <-chan string, n int) []<-chan string {
var xc []<-chan string
for i := 0; i < n; i++ {
xc = append(xc, parseStrings(in))
}
return xc
}

func parseStrings(in <-chan string) <-chan string {
out := make(chan string)
go func() {
for n := range in {
out <- parseLine(n)
}
close(out)
}()
return out
}

func parseLine(line string) string {
return line
}

func merge(cs []<-chan string) <-chan string {
var wg sync.WaitGroup
wg.Add(len(cs))

out := make(chan string)
for _, c := range cs {
go func(c <-chan string) {
for n := range c {
out <- n
}
wg.Done()
}(c)
}

go func() {
wg.Wait()
close(out)
}()
return out
}

Check it out on the playground .

关于multithreading - 如何等待执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34417872/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com