gpt4 book ai didi

google-cloud-pubsub - 确认的消息在 Google Pubsub 中徘徊

转载 作者:行者123 更新时间:2023-12-04 22:03:50 26 4
gpt4 key购买 nike

根据堆栈驱动程序图表,我们开始注意到某个主题/订阅的“未确认消息”数量不时增加。

症状

我不知道我们可以信任多少堆栈驱动程序图表,但我已经检查过:

  • 拉取操作数与发布操作数一样多
  • 问题发生时ack操作计数低于pull操作计数

  • 此外,根据我们的日志,我能够看到 pubsub 实际上多次发送相同的消息,这也证实了“pull”成功但“ack”可能不成功。

    所以,我认为我们可以假设我们的系统会迅速拉动,但从 GCP 的角度来看并不能很好地 ACK。

    我检查了不按时发送 ACK 的可能性,但我认为情况并非如此,如下面的流程所示。

    在有问题的订阅中,消息被累积了几个小时。对我们来说,这是一个严重的问题。

    实现细节

    我们出于某种原因使用 pull 方法,并且我们不愿意切换到 push 方法,除非有充分的理由。对于每个订阅,我们都有一个消息泵 goroutine,这个 goroutine 为每个拉取的消息生成一个 worker。更具体,
    // in a dedicated message-pumping goroutine
    sub, _ := CreateSubscription(..., 0 /* ack-deadline */, )
    iter, _ := sub.Pull(...)
    for {
    // omitted: wait if we have too many workers
    msg, _ := iter.Next()
    go func(msg Message) {
    // omitted: handle the message and measure the latency; it turned out the latency is almost within 1 second
    msg.Done(true)
    }(msg)
    }

    对于负载均衡,订阅也会被同一集群中的其他 Pod 拉取。因此,对于一个订阅(如在 Google Pubsub 主题/订阅中),我们有多个订阅对象(如在 Go 绑定(bind)的订阅结构中),每个订阅对象仅在一个 pod 中使用。并且,每个订阅对象都会创建一个迭代器。我相信这个设置没有错,但如果我错了,请纠正我。

    正如这段代码所示,我们执行 ACK。 (我们的服务器不会 panic ;因此没有绕过 msg.Done() 的途径。)

    尝试

    奇怪的是,有问题的订阅并不忙。对于在同一个 pod 中接收更多消息的另一个订阅,我们通常不会有任何问题。因此,我开始怀疑 pull 操作的 max-prefetch 选项是否会影响。它似乎解决了一段时间的问题,但问题再次出现。

    根据 Google 支持的建议,我还增加了 pod 的数量,这有效地增加了工作人员的数量。这没有多大帮助。由于我们没有向有问题的问题发布很多消息(大约 1 条消息/秒)并且我们有很多(可能太多)工作人员,我不相信我们的服务器重载。

    有人可以对此有所了解吗?

    最佳答案

    就我而言,由于某种原因 Ack 没有返回的症状经常出现,未设置针对 gRPC 调用的超时,并且“acker”的 groutine 被阻塞。

    screen shot

    所以我通过从 pubsub.NewClient 传递 gRPC 选项来解决它。

    import (
    "cloud.google.com/go/pubsub"
    "google.golang.org/api/option"
    "google.golang.org/grpc"
    )

    // ...

    scChan := make(chan grpc.ServiceConfig)
    go func() {
    sc := grpc.ServiceConfig{
    Methods: map[string]grpc.MethodConfig{
    "/google.pubsub.v1.Subscriber/Acknowledge": {
    Timeout: 5 * time.Second,
    },
    },
    }
    scChan <- sc
    }()

    c, err := pubsub.NewClient(ctx, project, option.WithGRPCDialOption(grpc.WithServiceConfig(scChan)))

    您可以通过指定 grpc.EnableTracing = true 来调查原因。 .
    grpc.EnableTracing = true

    c, err := pubsub.NewClient(ctx, project)
    if err != nil {
    return nil, errors.Wrap(err, "pubsub.NewClient")
    }

    go func(){
    http.ListenAndServe(":8080", nil)
    }()

    gRPC的trace信息可以通过 golang.org/x/net/trace确认.

    关于google-cloud-pubsub - 确认的消息在 Google Pubsub 中徘徊,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41901108/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com