gpt4 book ai didi

go - RabbitMQ 发布/订阅实现不工作

转载 作者:IT王子 更新时间:2023-10-29 02:13:09 33 4
gpt4 key购买 nike

我已经转换了 RabbitMQ pub/sub tutorial进入下面的虚拟测试。不知何故,它没有按预期工作。

amqpURL 是一个有效的 AMQP 服务(即 RabbitMQ)URL。我已经用队列示例对其进行了测试并且它有效。不知何故,它在“交换”中​​失败了

我希望 TestDummy 记录“[x] Hello World”。不知何故它没有发生。只有发送部分按预期工作。

我做错了什么?

import (
"fmt"
"log"
"testing"

"github.com/streadway/amqp"
)

func TestDummy(t *testing.T) {
done := exchangeReceive()
exchangeSend("Hello World")
<-done
}

func exchangeSend(msg string) {
failOnError := func(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}

log.Printf("exchangeSend: connect %s", amqpURL)
conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")

body := []byte(msg)
err = ch.Publish(
"logs", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")

log.Printf(" [x] Sent %s", body)
}

func exchangeReceive() <-chan bool {

done := make(chan bool)

failOnError := func(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}

log.Printf("exchangeReceive: connect %s", amqpURL)
conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")

q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
done <- true
}
}()

log.Printf(" [*] Waiting for logs. To exit press CTRL+C")

return done
}

最佳答案

这里有些愚蠢的错误。当 exchangeRecieve 结束时,延迟语句被触发并因此关闭连接。这就是我重写失败的原因。

我以这种方式更改了我的代码并且它起作用了:

import (
"fmt"
"os"
"testing"
"time"

"github.com/streadway/amqp"
)

func TestDummy(t *testing.T) {
amqpURL := os.Getenv("CLOUDAMQP_URL")
t.Logf(" [*] amqpURL: %s", amqpURL)

results1 := exchangeReceive(t, "consumer 1", amqpURL)
results2 := exchangeReceive(t, "consumer 2", amqpURL)
time.Sleep(50 * time.Millisecond)

exchangeSend(t, amqpURL, "Hello World")
if want, have := "Hello World", <-results1; want != have {
t.Errorf("expected %#v, got %#v", want, have)
}
if want, have := "Hello World", <-results2; want != have {
t.Errorf("expected %#v, got %#v", want, have)
}
}

func exchangeReceive(t *testing.T, name, amqpURL string) <-chan string {

out := make(chan string)

failOnError := func(err error, msg string) {
if err != nil {
t.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}

conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")

err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")

q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

go func() {
for d := range msgs {
t.Logf(" [x] %s received: %s", name, d.Body)
out <- string(d.Body)
}
}()

t.Logf(" [*] %s ready to receive", name)
return out
}

func exchangeSend(t *testing.T, amqpURL, msg string) {
failOnError := func(err error, msg string) {
if err != nil {
t.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}

conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")

body := []byte(msg)
err = ch.Publish(
"logs", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")

t.Logf(" [x] Sent %s", body)
}

关于go - RabbitMQ 发布/订阅实现不工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39825884/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com