gpt4 book ai didi

go - 处理来自 RabbitMQ 的消息时限制并发

转载 作者:IT王子 更新时间:2023-10-29 02:32:19 26 4
gpt4 key购买 nike

我正在尝试从队列 (RabbitMQ) 中读取 URL 并发出有限数量的并发 HTTP 请求,即有一个 10 人的工作池对从队列中接收到的 URL 发出并发请求(永远)。

到目前为止,我已经按照 RabbitMQ 教程实现了一个消费者: https://www.rabbitmq.com/tutorials/tutorial-one-go.html

并尝试了网络上发现的示例中的许多方法,以这里的示例结尾: http://jmoiron.net/blog/limiting-concurrency-in-go/

不幸的是,我当前的代码运行大约一分钟,然后无限期地卡住。我试过添加/移动 Go 例程,但我似乎无法让它按预期工作(我是 Go 的新手)。

当前代码:

package main

import (
"fmt"
"log"
"net/http"
"time"

"github.com/Xide/bloom"
"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}

var netClient = &http.Client{
Timeout: time.Second * 10,
}

func getRequest(url string) {
//resp, err := http.Get(string(url))
resp, err := netClient.Get(string(url))
if err != nil {
log.Printf("HTTP request error: %s", err)
return
}
fmt.Println("StatusCode:", resp.StatusCode)
fmt.Println(resp.Request.URL)
}

func main() {
bf := bloom.NewDefaultScalable(0.1)

conn, err := amqp.Dial("amqp://127.0.0.1:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
"urls", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, //global
)
failOnError(err, "Failed to set Qos")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

concurrency := 10
sem := make(chan bool, concurrency)
go func() {
for d := range msgs {
sem <- true
url := string(d.Body)
if bf.Match(url) == false {
bf.Feed(url)
log.Printf("Not seen: %s", d.Body)
go func(url string) {
defer func() { <-sem }()
getRequest(url)
}(url)
} else {
log.Printf("Already seen: %s", d.Body)
}
d.Ack(false)
}
for i := 0; i < cap(sem); i++ {
sem <- true
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}

最佳答案

您没有正确处理您的 HTTP 响应,导致打开的连接越来越多。试试这个:

func getRequest(url string) {
resp, err := netClient.Get(string(url))
if err != nil {
log.Printf("HTTP request error: %s", err)
return
}
// Add this bit:
defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
fmt.Println("StatusCode:", resp.StatusCode)
fmt.Println(resp.Request.URL)
}

在您阅读完来自 channel 的消息后,这似乎是不必要的并且可能存在问题:

    for i := 0; i < cap(sem); i++ {
sem <- true
}

为什么在读取队列中的所有消息后填充 sem channel ?您已经向 channel 添加了与您希望从中读取的消息一样多的消息,所以这充其量是毫无意义的,如果您对其余代码进行了错误的更改,可能会导致问题。

与您的问题无关,但这是多余的:

if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}

根据 the documentation , Fatalf 已经退出,所以 panic 永远不会被调用。如果你想登录并 panic,试试 log.Panicf ,专为此目的而设计。

关于go - 处理来自 RabbitMQ 的消息时限制并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45858684/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com