gpt4 book ai didi

mysql - 处理查询的工作池

转载 作者:IT王子 更新时间:2023-10-29 01:13:46 24 4
gpt4 key购买 nike

我是 Go 的新手,正在寻找一种方法来使用 100 个工作人员处理 3000 个查询,并确保每个工作人员都有一个连接(MySQL 已经配置了超过 100 个连接)。这是我的尝试:

package main

import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
)

var query *sql.Stmt

func worker(jobs <-chan int, results chan<- int) {

for _ = range jobs {

_, e := query.Exec("a")

if e != nil {

panic(e.Error())
}

results <- 1
}
}

func main() {

workers := 100

db, e := sql.Open("mysql", "foo:foo@/foo")

if e != nil {

panic(e.Error())
}

db.SetMaxOpenConns(workers)
db.SetMaxIdleConns(workers)

defer db.Close()

query, e = db.Prepare("INSERT INTO foo (foo) values(?)")

if e != nil {

panic(e.Error())
}

total := 30000
jobs := make(chan int, total)
results := make(chan int, total)

for w := 0; w < workers; w++ {

go worker(jobs, results)
}

for j := 0; j < total; j++ {

jobs <- j
}

close(jobs)

for r := 0; r < total; r++ {

<-results
}
}

它有效,但我不确定这是否是最好的方法。

如果您认为这是基于意见或根本不是一个好问题,请将其标记为已关闭并发表评论解释原因。

最佳答案

您所获得的基本上是有效的,但是要摆脱缓冲,您需要同时写入 jobs 并从 results 中读取。否则,您的流程最终会停滞不前——工作人员无法发送结果,因为没有任何东西收到它们,您也无法插入工作,因为工作人员被阻止了。

这是 a boiled-down example on the Playground关于如何在 main 中接收结果时在后台推送作业的工作队列:

package main

import "fmt"

func worker(jobs <-chan int, results chan<- int) {
for _ = range jobs {
// ...do work here...
results <- 1
}
}

func main() {
workers := 10
total := 30
jobs := make(chan int)
results := make(chan int)

// start workers
for w := 0; w < workers; w++ {
go worker(jobs, results)
}

// insert jobs in background
go func() {
for j := 0; j < total; j++ {
jobs <- j
}
}()

// collect results
for i := 0; i < total; i++ {
<-results
fmt.Printf(".")
}

close(jobs)
}

要使该特定代码起作用,您必须知道您将获得多少结果。如果您不知道(例如,每个作业可能产生零个或多个结果),您可以使用 sync.WaitGroupwait for the workers to finish, then close the result stream :

package main

import (
"fmt"
"sync"
)

func worker(jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
for _ = range jobs {
// ...do work here...
results <- 1
}
wg.Done()
}

func main() {
workers := 10
total := 30
jobs := make(chan int)
results := make(chan int)
wg := &sync.WaitGroup{}

// start workers
for w := 0; w < workers; w++ {
wg.Add(1)
go worker(jobs, results, wg)
}

// insert jobs in background
go func() {
for j := 0; j < total; j++ {
jobs <- j
}
close(jobs)
wg.Wait()
// all workers are done so no more results
close(results)
}()

// collect results
for _ = range results {
fmt.Printf(".")
}

}

还有许多其他更复杂的技巧可以用来在发生错误后停止所有工作程序,将结果按与原始作业相同的顺序排列,或者做其他类似的事情。不过,听起来好像基本版本在这里工作。

关于mysql - 处理查询的工作池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26657039/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com