gpt4 book ai didi

Google Pub/Sub 消息排序不起作用(或将延迟增加到 10 秒以上)?

转载 作者:行者123 更新时间:2023-12-01 22:11:30 25 4
gpt4 key购买 nike

我正在尝试制作一个简化示例来演示如何使用 Google Pub/Sub 的消息排序功能 (https://cloud.google.com/pubsub/docs/ordering)。从这些文档中,为订阅启用消息排序后,

After the message ordering property is set, the Pub/Sub service delivers messages with the same ordering key in the order that the Pub/Sub service receives the messages. For example, if a publisher sends two messages with the same ordering key, the Pub/Sub service delivers the oldest message first.


我用它来编写以下示例:
package main

import (
"context"
"log"
"time"

"cloud.google.com/go/pubsub"
uuid "github.com/satori/go.uuid"
)

func main() {
client, err := pubsub.NewClient(context.Background(), "my-project")
if err != nil {
log.Fatalf("NewClient: %v", err)
}

topicID := "test-topic-" + uuid.NewV4().String()
topic, err := client.CreateTopic(context.Background(), topicID)
if err != nil {
log.Fatalf("CreateTopic: %v", err)
}
defer topic.Delete(context.Background())

subID := "test-subscription-" + uuid.NewV4().String()
sub, err := client.CreateSubscription(context.Background(), subID, pubsub.SubscriptionConfig{
Topic: topic,
EnableMessageOrdering: true,
})
if err != nil {
log.Fatalf("CreateSubscription: %v", err)
}
defer sub.Delete(context.Background())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

messageReceived := make(chan struct{})
go sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
log.Printf("Received message with ordering key %s: %s", msg.OrderingKey, msg.Data)
msg.Ack()
messageReceived <- struct{}{}
})

topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang1!"), OrderingKey: "foobar"})
topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang2!"), OrderingKey: "foobar"})

for i := 0; i < 2; i++ {
select {
case <-messageReceived:
case <-time.After(10 * time.Second):
log.Fatal("Expected to receive a message, but timed out after 10 seconds.")
}
}
}
首先,我尝试了没有指定 OrderingKey: "foobar" 的程序。在 topic.Publish()来电。这导致以下输出:
> go run main.go
2020/08/10 21:40:34 Received message with ordering key : Dang2!
2020/08/10 21:40:34 Received message with ordering key : Dang1!
换句话说,消息的接收顺序与它们发布的顺序不同,这在我的用例中是不可取的,我想通过指定 OrderingKey 来防止。
但是,只要我添加了 OrderingKey s 在发布调用中,程序在等待接收 Pub/Sub 消息 10 秒后超时:
> go run main.go
2020/08/10 21:44:36 Expected to receive a message, but timed out after 10 seconds.
exit status 1
我希望现在首先收到消息 Dang1!后跟 Dang2! ,但我没有收到任何消息。知道为什么这没有发生吗?

最佳答案

发布失败并出现以下错误:Failed to publish: Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering .
如果您更改发布调用以检查错误,您可以看到这一点:

res1 := topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang1!"), OrderingKey: "foobar"})
res2 := topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang2!"), OrderingKey: "foobar"})

_, err = res1.Get(ctx)
if err != nil {
fmt.Printf("Failed to publish: %v", err)
return
}

_, err = res2.Get(ctx)
if err != nil {
fmt.Printf("Failed to publish: %v", err)
return
}
要修复它,请添加一行以启用主题的消息排序。您的主题创建如下:
topic, err := client.CreateTopic(context.Background(), topicID)
if err != nil {
log.Fatalf("CreateTopic: %v", err)
}
topic.EnableMessageOrdering = true
defer topic.Delete(context.Background())

关于Google Pub/Sub 消息排序不起作用(或将延迟增加到 10 秒以上)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63351729/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com