gpt4 book ai didi

apache-kafka - 我怎么知道我已经消费了所有 Kafka 主题?

转载 作者:行者123 更新时间:2023-12-01 12:15:41 24 4
gpt4 key购买 nike

我正在使用 Flink v1.4.0。我正在使用 Kafka FLink Consumer 根据以下代码从 Kafka 主题中获取数据:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
myConsumer.setStartFromLatest(); // start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour

DataStream<String> stream = env.addSource(myConsumer);
...

有没有办法知道我是否已经读完了整个主题?如何监控偏移量? (这是确认我已经使用了 Kafka 主题中的所有数据的充分方法吗?)

最佳答案

由于 Kafka 通常与连续数据流一起使用,因此消费“所有”主题可能是也可能不是一个有意义的概念。我建议你看看documentation on how Flink exposes Kafka's metrics ,其中包括以下解释:

The difference between the committed offset and the most recent offset in 
each partition is called the consumer lag. If the Flink topology is consuming
the data slower from the topic than new data is added, the lag will increase
and the consumer will fall behind. For large production deployments we
recommend monitoring that metric to avoid increasing latency.

因此,如果消费者滞后为零,您就被追上了。也就是说,您可能希望能够自己比较偏移量,但我不知道有什么简单的方法可以做到这一点。

关于apache-kafka - 我怎么知道我已经消费了所有 Kafka 主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48427775/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com