gpt4 book ai didi

go - GCP发布/订阅:使用goroutines使多个订户在一个应用程序中运行

转载 作者:行者123 更新时间:2023-12-01 21:13:04 26 4
gpt4 key购买 nike

从GCP发布/订阅接收消息时,我发现一个奇怪的行为。
以下代码是我如何使用pubsub client注册订阅的方式

gcp.go

package gcp

import (
"context"
"path"
"runtime"

"google.golang.org/api/option"

"cloud.google.com/go/pubsub"
)

// PubsubClient is the GCP pubsub service client.
var PubsubClient *pubsub.Client

// Initialize initializes GCP client service using the environment.
func Initialize(env, projectName string) error {
var err error
ctx := context.Background()
credentialOpt := option.WithCredentialsFile(getFilePathByEnv(env))

PubsubClient, err = pubsub.NewClient(ctx, projectName, credentialOpt)
return err
}

// GetTopic returns the specified topic in GCP pub/sub service and create it if it not exist.
func GetTopic(topicName string) (*pubsub.Topic, error) {
topic := PubsubClient.Topic(topicName)
ctx := context.Background()
isTopicExist, err := topic.Exists(ctx)

if err != nil {
return topic, err
}

if !isTopicExist {
ctx = context.Background()
topic, err = PubsubClient.CreateTopic(ctx, topicName)
}

return topic, err
}

// GetSubscription returns the specified subscription in GCP pub/sub service and creates it if it not exist.
func GetSubscription(subName string, topic *pubsub.Topic) (*pubsub.Subscription, error) {
sub := PubsubClient.Subscription(subName)
ctx := context.Background()
isSubExist, err := sub.Exists(ctx)

if err != nil {
return sub, err
}

if !isSubExist {
ctx = context.Background()
sub, err = PubsubClient.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{Topic: topic})
}

return sub, err
}

func getFilePathByEnv(env string) string {
_, filename, _, _ := runtime.Caller(1)

switch env {
case "local":
return path.Join(path.Dir(filename), "local.json")
case "development":
return path.Join(path.Dir(filename), "development.json")
case "staging":
return path.Join(path.Dir(filename), "staging.json")
case "production":
return path.Join(path.Dir(filename), "production.json")
default:
return path.Join(path.Dir(filename), "local.json")
}
}

main.go
package main

import (
"context"
"fmt"
"log"
"net/http"
"runtime"
"runtime/debug"
"runtime/pprof"
"time"

"rpriambudi/pubsub-receiver/gcp"

"cloud.google.com/go/pubsub"
"github.com/go-chi/chi"
)

func main() {
log.Fatal(http.ListenAndServe(":4001", Route()))
}

func Route() *chi.Mux {
InitializeSubscription()
chiRoute := chi.NewRouter()

chiRoute.Route("/api", func(r chi.Router) {
r.Get("/_count", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Number of goroutines: %v", runtime.NumGoroutine())
})

r.Get("/_stack", getStackTraceHandler)
})

return chiRoute
}

func InitializeSubscription() {
gcp.Initialize("local", "fifth-bonbon-277102")

go pubsubHandler("test-topic-1", "test-topic-1-subs")
go pubsubHandler("test-topic-2", "test-topic-2-subs")
go pubsubHandler("test-topic-3", "test-topic-3-subs")
// ....

return
}

func getStackTraceHandler(w http.ResponseWriter, r *http.Request) {
stack := debug.Stack()
w.Write(stack)

pprof.Lookup("goroutine").WriteTo(w, 2)
}

func pubsubHandler(topicID string, subscriptionID string) {
topic, err := gcp.GetTopic(topicID)
fmt.Println("topic: ", topic)
if err != nil {
fmt.Println("Failed get topic: ", err)
return
}

sub, err := gcp.GetSubscription(subscriptionID, topic)
fmt.Println("subscription: ", sub)
if err != nil {
fmt.Println("Get subscription err: ", err)
return
}

err = sub.Receive(context.Background(), func(ctx context.Context, msg *pubsub.Message) {
messageHandler(subscriptionID, ctx, msg)
})
if err != nil {
fmt.Println("receive error: ", err)
}
}

func messageHandler(subscriptionID string, ctx context.Context, msg *pubsub.Message) {
defer func() {
if r := recover(); r != nil {
fmt.Println("recovered from panic.")
msg.Ack()
}
}()

fmt.Println("message of subscription: ", subscriptionID)
fmt.Println("Message ID: ", string(msg.ID))
fmt.Println("Message received: ", string(msg.Data))

msg.Ack()
time.Sleep(10 * time.Second)
}

当我在 pubsubHandler中只包含一些 InitializeSubscription时,它会很好用。但是,当我在initialize函数(大约10个或更多处理程序)中添加更多 pubsubHandler时,事情开始变得很有趣。 ACK永远不会到达pubsub服务器,这使得消息根本没有被确认(我已经在指标浏览器中检查了 AcknowledgeRequest,并且没有确认请求到来)。因此,消息不断返回给订户。另外,当我重新启动应用程序时,有时它不会收到任何消息,无论是新消息还是未确认的消息。

我似乎通过将 NumGoroutines函数中的每个订阅对象的 1设置为 pubsubHandler来找到一种解决方法。
func pubsubHandler(topicID string, subscriptionID string) {
....

sub, err := gcp.GetSubscription(subscriptionID, topic)

....

sub.ReceiverSettings.NumGoroutines = 1
err = sub.Receive(context.Background(), func(ctx context.Context, msg *pubsub.Message) {
messageHandler(subscriptionID, ctx, msg)
})

....
}

我的问题是,这是预期的行为吗?可能导致这些意外行为的根本原因是什么?还是我的实现完全错误,无法达到预期的结果? (在一个应用程序中有多个订阅)。还是在创建订阅处理程序时有什么最佳实践可遵循?

以我的理解, Receive中的 pubsub.Subscription函数本身就是一个阻止代码。因此,当我尝试在goroutine中运行它时,可能会导致意外的副作用,尤其是如果我们不限制可以处理消息的goroutine的数量时。我的推理有效吗?
谢谢您的回答,祝您有美好的一天!

编辑1 :将示例更新为完整代码,因为之前pubsub客户端没有直接导入main.go中。

最佳答案

我认为问题可能出在您处理邮件的速率(目前每条邮件10秒)。如果您一次收到太多消息,则客户端可能不堪重负,这将导致积压的消息堆积。
我建议您使用flow control settings并将ReceiveSettings.NumGoroutines增大到比默认值10高的值。如果发布率很高,则还可以增大MaxOutstandingMessages或将其设置为-1来完全禁用该限制。这告诉客户端一次保留更多消息,每个Receive调用共享一个限制。

关于go - GCP发布/订阅:使用goroutines使多个订户在一个应用程序中运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62635263/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com