gpt4 book ai didi

multithreading - 与超时并行运行循环

转载 作者:行者123 更新时间:2023-12-02 11:24:38 27 4
gpt4 key购买 nike

我需要在 parallel 中运行请求,而不是一个接一个地运行请求,但有超时。现在我可以在 go 中完成吗?

这是我需要在并行中运行的特定代码,这里的技巧也是使用超时,即根据超时等待所有请求,并在所有请求完成后获取响应。

    for _, test := range testers {
checker := NewTap(test.name, test.url, test.timeout)
res, err := checker.Check()
if err != nil {
fmt.Println(err)
}
fmt.Println(res.name)
fmt.Println(res.res.StatusCode)

}

这是全部代码(工作代码) https://play.golang.org/p/cXnJJ6PW_CF

package main

import (
`fmt`
`net/http`
`time`
)

type HT interface {
Name() string
Check() (*testerResponse, error)
}

type testerResponse struct {
name string
res http.Response
}

type Tap struct {
url string
name string
timeout time.Duration
client *http.Client
}

func NewTap(name, url string, timeout time.Duration) *Tap {
return &Tap{
url: url,
name: name,
client: &http.Client{Timeout: timeout},
}
}

func (p *Tap) Check() (*testerResponse, error) {
response := &testerResponse{}
req, err := http.NewRequest("GET", p.url, nil)
if err != nil {
return nil, err
}
res, e := p.client.Do(req)
response.name = p.name
response.res = *res
if err != nil {
return response, e
}
return response, e
}

func (p *Tap) Name() string {
return p.name
}

func main() {

var checkers []HT

testers := []Tap{
{
name: "first call",
url: "http://stackoverflow.com",
timeout: time.Second * 20,
},
{
name: "second call",
url: "http://www.example.com",
timeout: time.Second * 10,
},
}

for _, test := range testers {
checker := NewTap(test.name, test.url, test.timeout)
res, err := checker.Check()
if err != nil {
fmt.Println(err)
}
fmt.Println(res.name)
fmt.Println(res.res.StatusCode)

checkers = append(checkers, checker)

}
}

最佳答案

Go 中流行的并发模式是使用工作池。

一个基本的工作线程池使用两个 channel ;一个用于放置作业,另一个用于读取结果。在这种情况下,我们的工作 channel 将是 Tap 类型,而我们的结果 channel 将是 testerResponse 类型。

worker

从 jobs channel 中获取一个 job,并将结果放在 results channel 中。

// worker defines our worker func. as long as there is a job in the
// "queue" we continue to pick up the "next" job
func worker(jobs <-chan Tap, results chan<- testerResponse) {
for n := range jobs {
results <- n.Check()
}
}

工作

要添加工作,我们需要迭代我们的测试人员并将他们放在我们的工作 channel 上。

// makeJobs fills up our jobs channel
func makeJobs(jobs chan<- Tap, taps []Tap) {
for _, t := range taps {
jobs <- t
}
}

结果

为了读取结果,我们需要迭代它们。

// getResults takes a job from our worker pool and gets the result
func getResults(tr <-chan testerResponse, taps []Tap) {
for range taps {
r := <- tr
status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)
if r.err != nil {
status = fmt.Sprintf(r.err.Error())
}
fmt.Println(status)
}
}

最后是我们的主要功能。

func main() {
// Make buffered channels
buffer := len(testers)
jobsPipe := make(chan Tap, buffer) // Jobs will be of type `Tap`
resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`

// Create worker pool
// Max workers default is 5
// maxWorkers := 5
// for i := 0; i < maxWorkers; i++ {
// go worker(jobsPipe, resultsPipe)
// }

// the loop above is the same as doing:
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
// ^^ this creates 5 workers..

makeJobs(jobsPipe, testers)
getResults(resultsPipe, testers)
}

综合考虑

我将“第二次调用”的超时更改为一毫秒以显示超时的工作原理。

package main

import (
"fmt"
"net/http"
"time"
)

type HT interface {
Name() string
Check() (*testerResponse, error)
}

type testerResponse struct {
err error
name string
res http.Response
url string
}

type Tap struct {
url string
name string
timeout time.Duration
client *http.Client
}

func NewTap(name, url string, timeout time.Duration) *Tap {
return &Tap{
url: url,
name: name,
client: &http.Client{Timeout: timeout},
}
}

func (p *Tap) Check() testerResponse {
fmt.Printf("Fetching %s %s \n", p.name, p.url)
// theres really no need for NewTap
nt := NewTap(p.name, p.url, p.timeout)
res, err := nt.client.Get(p.url)
if err != nil {
return testerResponse{err: err}
}

// need to close body
res.Body.Close()
return testerResponse{name: p.name, res: *res, url: p.url}
}

func (p *Tap) Name() string {
return p.name
}

// makeJobs fills up our jobs channel
func makeJobs(jobs chan<- Tap, taps []Tap) {
for _, t := range taps {
jobs <- t
}
}

// getResults takes a job from our jobs channel, gets the result, and
// places it on the results channel
func getResults(tr <-chan testerResponse, taps []Tap) {
for range taps {
r := <-tr
status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)
if r.err != nil {
status = fmt.Sprintf(r.err.Error())
}
fmt.Printf(status)
}
}

// worker defines our worker func. as long as there is a job in the
// "queue" we continue to pick up the "next" job
func worker(jobs <-chan Tap, results chan<- testerResponse) {
for n := range jobs {
results <- n.Check()
}
}

var (
testers = []Tap{
{
name: "1",
url: "http://google.com",
timeout: time.Second * 20,
},
{
name: "2",
url: "http://www.yahoo.com",
timeout: time.Second * 10,
},
{
name: "3",
url: "http://stackoverflow.com",
timeout: time.Second * 20,
},
{
name: "4",
url: "http://www.example.com",
timeout: time.Second * 10,
},
{
name: "5",
url: "http://stackoverflow.com",
timeout: time.Second * 20,
},
{
name: "6",
url: "http://www.example.com",
timeout: time.Second * 10,
},
{
name: "7",
url: "http://stackoverflow.com",
timeout: time.Second * 20,
},
{
name: "8",
url: "http://www.example.com",
timeout: time.Second * 10,
},
{
name: "9",
url: "http://stackoverflow.com",
timeout: time.Second * 20,
},
{
name: "10",
url: "http://www.example.com",
timeout: time.Second * 10,
},
{
name: "11",
url: "http://stackoverflow.com",
timeout: time.Second * 20,
},
{
name: "12",
url: "http://www.example.com",
timeout: time.Second * 10,
},
{
name: "13",
url: "http://stackoverflow.com",
timeout: time.Second * 20,
},
{
name: "14",
url: "http://www.example.com",
timeout: time.Second * 10,
},
}
)

func main() {
// Make buffered channels
buffer := len(testers)
jobsPipe := make(chan Tap, buffer) // Jobs will be of type `Tap`
resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`

// Create worker pool
// Max workers default is 5
// maxWorkers := 5
// for i := 0; i < maxWorkers; i++ {
// go worker(jobsPipe, resultsPipe)
// }

// the loop above is the same as doing:
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
// ^^ this creates 5 workers..

makeJobs(jobsPipe, testers)
getResults(resultsPipe, testers)
}

哪些输出:

// Fetching http://stackoverflow.com 
// Fetching http://www.example.com
// Get "http://www.example.com": context deadline exceeded (Client.Timeout exceeded while awaiting headers)
// 'first call' to 'http://stackoverflow.com' was fetched with status '200'

关于multithreading - 与超时并行运行循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64126780/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com