gpt4 book ai didi

java - Flink - 查询Kafka主题以获取消费者组的偏移量?

转载 作者:行者123 更新时间:2023-11-30 10:26:26 25 4
gpt4 key购买 nike

问题:如何在 flink 代码中查询 kafka 主题以获取特定消费者组的偏移量? (还有附带问题(如果需要,我会在这里提出一个新问题)。如果可能的话,我怎样才能获得该偏移量的时间戳?

(我发现有 cli 工具可以查询它,但这不是我想要的,因为它不是在 flink 作业中以编程方式完成的)

关于整个问题的一些额外背景,但我不想让它过于开放。

我有一个用例,其中数据将从 kafkaTopic1 流入程序(我们称之为 P1),进行处理,然后持久保存到数据库中。 P1 将在一个多节点集群上,因此每个节点将处理多个 kafka 分区(假设该主题有 5 个节点和 50 个 kafka 分区)。如果其中一个节点由于某种原因完全失败并且正在处理数据,那么该数据将会丢失。

比如kafkaTopic1上有500条消息,node2已经拉取了10条消息(那么根据offset下一条要拉取的消息是11条消息)但是当节点失败,仍在处理的 2 将丢失。当节点重新启动时,它将开始读取消息 11,跳过两条丢失的消息(从技术上讲,kafka 分区将开始将其消息发送到另一个节点进行处理,因此该分区的偏移量将移动,我们不会必须确切地知道节点死亡时下一个要处理的消息)。

(注意:当节点死亡时,假设用户注意到并完全关闭 P1,因此此时不会处理更多数据,暂时)。

这就是 flink 发挥作用的地方。我想做一个 flink 作业,可以通过用户的参数告诉 P1 的消费者组,然后查询 kafka 主题(也由用户提供)以获取当前偏移量(OS1)。然后,flink 作业将其 kafkaTopic1 的偏移量设置为 OS1 之前的 X 时间量(X 由用户通过 args 提供)并开始从 kafka 主题读取消息。然后它将读取的每条消息与数据库中的消息进行比较,如果在数据库中没有找到该消息,它会将其发送到另一个 kafka 主题 (kafkaTopic2),以便在 P1 重新启动时进行处理。

最佳答案

如果在 flink 作业中启用了检查点,那么您不应该丢失消息,因为 flink 也在内部维护偏移量,并且在从故障中恢复后,它应该从 flink 上次提交的偏移量读取。

现在,如果您仍然想找到偏移量并重新从偏移量读取,这会变得很棘手,因为您需要找到给定消费者组给定主题的所有分区的偏移量。

我不知道如何从开箱即用的 Flink-kafka-Consumer API 做到这一点,但您可以将 kafka 依赖项添加到您的项目并从 Kafka API 创建一个 kafkaconsumer。一旦有了消费者,就可以调用

consumer.position(partition) 

consumer.committed(partition)

Mind that you still need to loop over all the partitions to get all the current offsets

在此处了解差异:Kafka Javadoc

一旦有了要读取的偏移量,就可以使用类似以下内容在 flink 作业中手动指定消费者偏移量:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);

myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

有关 Flink-kafka-Consumer 的更多信息,请查看 Flink Kafka Connector

关于java - Flink - 查询Kafka主题以获取消费者组的偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45718240/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com