gpt4 book ai didi

apache-kafka - Kafka 高级消费者使用 Java API 从主题中获取所有消息(相当于 --from-beginning)

转载 作者:行者123 更新时间:2023-12-04 10:18:24 24 4
gpt4 key购买 nike

我正在使用来自 Kafka 站点的 ConsumerGroupExample 代码测试 Kafka 高级消费者。我想检索关于我在 Kafka 服务器配置中拥有的名为“test”的主题的所有现有消息。查看其他博客, auto.offset.reset 应该设置为“最小”才能获取所有消息:

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId)    {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "10000");

return new ConsumerConfig(props);
}

我真正的问题是:对于高级消费者的等效 Java api 调用是什么,相当于:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

最佳答案

基本上,每次新消费者尝试消费一个主题时,它都会从头开始读取消息。如果您每次只是为了测试目的而从头开始消费,那么每次您使用新的 groupID 初始化您的消费者时,它都会从头开始读取消息。这是我如何做到的:

properties.put("group.id", UUID.randomUUID().toString());

并每次从头开始阅读消息!

关于apache-kafka - Kafka 高级消费者使用 Java API 从主题中获取所有消息(相当于 --from-beginning),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21762406/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com