gpt4 book ai didi

java - spring-boot 1.5.4 如果消费者(kube pod)重新启动,Spring Cloud Stream 手动偏移提交行为

转载 作者:行者123 更新时间:2023-12-01 16:37:49 25 4
gpt4 key购买 nike

您好,我们一直在使用旧的 spring 版本和 kafka 1.1,具有以下依赖项

+--- org.springframework.boot:spring-boot-starter-web: -> 1.5.4.RELEASE (*)
+--- org.springframework.cloud:spring-cloud-starter-config: -> 1.3.1.RELEASE (*)
+--- org.springframework.cloud:spring-cloud-starter-bus-kafka: -> 1.3.1.RELEASE (*)
+--- org.springframework.cloud:spring-cloud-stream-binder-kafka: -> 1.2.1.RELEASE (*)
+--- org.apache.kafka:kafka-clients:0.11.0.2 (*)
+--- org.apache.kafka:kafka_2.11:0.11.0.2 (*)

我在 application.yaml 文件中有以下配置

spring:
kafka:
properties.security.protocol: SSL
ssl:
key-password: ${ABC_KEY_PASSWORD}
keystore-location: file:${ABC_KEYSTORE}
keystore-password: ${ABC_KEYSTORE_PASSWORD}
truststore-location: file:${ABC_TRUSTSTORE}
truststore-password: ${ABC_TRUSTSTORE_PASSWORD}
consumer:
enable-auto-commit: false
cloud:
stream:
bindings:
feed_fcpostprocessor_event:
destination: app-fc-notification
binder: abckafka
group: app-fc-group
consumer:
partitioned: true
concurrency: 2
kafka:
binder:
brokers: ${ABC_BROKER_HOST:abc-kafka.aws.local:9092}
autoCreateTopics: true
autoAddPartitions: true
replicationFactor: 3
partitionCount: 2
bindings:
feed_fcpostprocessor_event:
consumer:
autoCommitOffset: false
binders:
abckafka:
type: kafka

我找到了link在堆栈溢出中。答案是

First, I am not sure what are your expectation for SpringApplication.exit(applicationContext,()-> 0);, but you're essentially bringing down the entire application with everything hat may be running there. Second, your message loss is due to the fact that Kafka binder has absolutely no knowledge that an exception had occurred and that it has to put message back on the topic.

这是否意味着 Kafka 不知道消费者已关闭,在这种情况下,消费者没有任何通信。我本以为会发生重新平衡,而且由于 kafka 级别的启用自动提交和绑定(bind)级别的 autoCommitOffset 设置为 false。该偏移量将不会提交,如果应用程序再次启动,它将从它错过的最后一个偏移量开始,或者如果发生重新平衡,则另一个使用者将从该分区的其他使用者失败的偏移量中读取。

我相信我的问题不是重复的,并且有切中要害的查询。如果有问题,请指出我的答案。否则请澄清该行为。谢谢

最佳答案

首先,尽管Boot 1.5不再受支持;你应该使用 spring-kafka 1.3.11 而不是 1.1;由于 KIP-62,它的线程模型要简单得多。在 1.3 之前,避免重新平衡的事情非常复杂。

但是,更好的是,您应该升级到所有项目的受支持版本,但至少将 spring-kafka 升级到 1.3。

您的期望是正确的;具有未提交偏移量的记录将被重新传送。

关于java - spring-boot 1.5.4 如果消费者(kube pod)重新启动,Spring Cloud Stream 手动偏移提交行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61924441/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com