gpt4 book ai didi

apache-kafka - 当我重新运行 Flink 消费者时,Kafka 再次消费最新的消息

转载 作者:行者123 更新时间:2023-12-01 10:35:33 24 4
gpt4 key购买 nike

我在用 Scala 编写的 Apache Flink API 中创建了一个 Kafka 消费者。每当我从一个主题传递一些消息时,它都会及时接收它们。但是,当我重新启动消费者时,它没有接收到新的或未使用的消息,而是使用了发送到该主题的最新消息。

这是我正在做的:

  1. 运行生产者:

    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
  2. 运行消费者:

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val st = env
    .addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties))
    env.enableCheckpointing(5000)
    st.print()
    env.execute()
  3. 传递一些消息

  4. 停止消费者
  5. 再次运行消费者打印我发送的最后一条消息。我希望它只打印新消息。

最佳答案

您正在运行一个检查点间隔为 5 秒的 Kafka 消费者。因此,每 5 秒,Flink 就会创建一个操作符状态(偏移量)的副本以供恢复。

检查点完成后,它会让运算符(operator)知道检查点已完成。根据该通知,Kafka 消费者将偏移量提交给 Zookeeper。所以大约每 5 秒,我们将最后一个检查点的偏移量写入 ZK。

当你再次启动 Flink 作业时,它会在 ZK 中找到偏移量并从那里继续。根据时间的不同,所有提交到 ZK 之后收到的消息都会重新发送。

您无法避免此行为,因为 .print() “运算符”不是检查点的一部分。它的意思是调试实用程序。然而,参与检查点的数据接收器(例如滚动文件接收器)将确保没有重复写入文件系统。

关于apache-kafka - 当我重新运行 Flink 消费者时,Kafka 再次消费最新的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36171125/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com