gpt4 book ai didi

java - 如何安全地取消订阅 Kafka 中的主题

转载 作者:太空宇宙 更新时间:2023-11-04 09:02:55 25 4
gpt4 key购买 nike

我有一个简单的java程序(dockerized)并部署在kubernetes(pod)中这个java程序只是一个普通的java项目,它监听并消费特定的主题。例如。样本安全主题

我必须安全地取消订阅这个主题,这意味着即使我删除了这个 pod,也不会丢失任何数据(java 消费者)。

这是我通过搜索看到的代码:

 public static void unsubscribeSafelyFromKafka() {  

logger.debug("Safely unsubscribe to topic..");

if (myKakfaConsumer != null) {
myKafkaConsumer.unsubscribe();
myKafkaConsumer.close();
}
}

我需要通过命令行运行它,其中 Java 程序已经有一个现有的静态 main 方法。

我的问题是:

  1. 上面的代码能保证记录不会丢失吗?
  2. 当已经存在静态 main() 时,如何通过命令行触发上面的代码

注意:我通过命令行运行 java 项目。例如。 java -jar MyKafkaConsumer.jar 因为这是要求。

请帮忙

最佳答案

如果我对问题 1 的理解正确,您担心的是,在通过控制台命令触发的一个线程取消订阅后,轮询使用者可能会处理一批记录,如果 pod 被杀死,这些记录可能会丢失?

如果您有其他 Pod 作为同一消费者组的一部分进行消费,或者如果该 Pod 或任何 Pod 使用相同的组 ID 再次订阅,则最后提交的偏移量将确保不会丢失任何记录(尽管有些记录可能会被处理多次),因为这是接管的消费者的起始位置。

如果您使用最安全的自动提交,因为每次提交都发生在后续轮询中,因此您不可能提交尚未处理的记录(只要您不生成额外的线程来进行处理)。手动提交让您决定何时处理记录以及何时可以安全提交。

但是,取消订阅后调用 close 是一个好主意,并且应该确保当前轮询批处理的干净完成并提交最终偏移量,只要这一切都在超时时间内发生。

关于问题 2,如果您需要手动取消订阅,那么我认为您需要 JMX 或公开 API 或类似的方法来调用正在运行的 JVM 上的方法。但是,如果您只是想确保 Pod 终止时安全关闭,您可以在关闭 Hook 中取消订阅,或者不用担心,因为偏移量提交提供了安全性。

关于java - 如何安全地取消订阅 Kafka 中的主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60561472/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com