gpt4 book ai didi

spring-boot - GCP PubSub Spring Boot 重复提取消息

转载 作者:行者123 更新时间:2023-12-04 17:36:29 25 4
gpt4 key购买 nike

我需要帮助解决 gcp pub/sus 的问题。我有一个进程将 100 strip 有过滤器的消息发送到 pubsub,另一个应用程序(在 spring boot 中)接收这些消息。当 spring boot 应用程序从 pubsub(不是拉取)接收消息时,处理 100 条消息,但是,进入进程,接收更多消息,在不同的时间接收不同数量的消息,任何时候接收 120,另一个 140,其他超过 200。我没有找到任何解决方案,这是我的代码:

    @Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver() {
return message -> {
System.out.println("Message arrived! Payload: " + new String((byte[]) message.getPayload()));
//other process of app (call other api)
AckReplyConsumer consumer = (AckReplyConsumer) message.getHeaders().get(GcpPubSubHeaders.ACKNOWLEDGEMENT);
consumer.ack();
};
}

请帮帮我!!!

最佳答案

在 Google Cloud Pub/Sub 中,由于不同的原因可能会出现重复的消息。要记住的一件事是 Cloud Pub/Sub 提供至少一次交付,这意味着一定数量的重复始终是可能的,因此您的应用程序必须对它们具有弹性。不过,那么多重复项似乎确实有点高。一般来说,重复通常可能由于以下原因而发生:

  1. 消息被发布者发送了不止一次。如果发布者与 Cloud Pub/Sub 断开连接并再次发送相同的消息,就会发生这种情况。如果出现这种类型的重复,则消息将具有不同的消息 ID。
  2. 订阅者确认消息的时间太长。在您的代码中,您有 //应用程序的其他进程(调用其他 api)。这个过程需要多长时间?如果它比确认消息的截止时间长,则消息将被重新传递。请记住,如果此其他进程需要为所有消息获取锁,则可能会出现争用问题,因为有太多请求试图同时获取这些锁,从而导致处理延迟。默认情况下,消息的确认截止时间为十秒。使用 Java 客户端库时,截止日期会自动延长 maxAckExtensionPeriod ,默认为一小时。此属性可以在 DefaultSubscriberFactory 中设置也适用于 Spring 。
  3. 消息根本不被确认。如果异常阻止了对 ack 的调用,或者存在死锁导致永远无法到达该行代码,则将重新传递消息。
  4. 用例是 large backlog of small messages 之一.在这种情况下,缓冲区很容易以导致重新传递消息的方式填满客户端。

关于spring-boot - GCP PubSub Spring Boot 重复提取消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56535068/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com