gpt4 book ai didi

bigdata - 弗林克+卡夫卡: Why am I losing messages?

转载 作者:行者123 更新时间:2023-12-01 16:40:44 27 4
gpt4 key购买 nike

我写了一个非常简单的 Flink 流作业,它使用 FlinkKafkaConsumer082 从 Kafka 获取数据。

protected DataStream<String> getKafkaStream(StreamExecutionEnvironment env, String topic) {
Properties result = new Properties();
result.put("bootstrap.servers", getBrokerUrl());
result.put("zookeeper.connect", getZookeeperUrl());
result.put("group.id", getGroup());

return env.addSource(
new FlinkKafkaConsumer082<>(
topic,
new SimpleStringSchema(), result);
}

这非常有效,每当我在 Kafka 的主题中放入一些东西时,它就会被我的 Flink 作业接收并处理。现在我试着看看如果我的 Flink Job 由于某种原因不在线会发生什么。于是我关闭了flink job,继续向Kafka发送消息。然后我再次启动我的 Flink 作业,并期望它能够处理同时发送的消息。

但是,我收到了这条消息:

No prior offsets found for some partitions in topic collector.Customer. Fetched the following start offsets [FetchPartition {partition=0, offset=25}]

所以它基本上忽略了自上次关闭 Flink 作业以来出现的所有消息,并在队列末尾开始读取。从我收集到的 FlinkKafkaConsumer082 的文档中,它会自动负责将已处理的偏移量与 Kafka 代理同步。然而,情况似乎并非如此。

我正在使用单节点 Kafka 安装(Kafka 发行版附带的那个)和单节点 Zookeper 安装(也是与 Kafka 发行版捆绑在一起的那个)。

我怀疑这是某种配置错误或类似的问题,但我真的不知道从哪里开始寻找。有没有其他人遇到过这个问题,也许已经解决了?

最佳答案

我找到了原因。您需要在 StreamExecutionEnvironment 中显式启用检查点,以使 Kafka 连接器将处理后的偏移量写入 Zookeeper。如果您不启用它,Kafka 连接器将不会写入最后读取的偏移量,因此当收集作业重新启动时它将无法从那里恢复。所以一定要写:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(); // <-- this is the important part

Anatoly 关于更改初始偏移量的建议可能仍然是一个好主意,以防检查点由于某种原因失败。

关于bigdata - 弗林克+卡夫卡: Why am I losing messages?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33501574/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com