gpt4 book ai didi

java - 在 Kafka 中使用实时消息

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:13:43 25 4
gpt4 key购买 nike

我已经启动了我的 zookeeper 和 Kafka 服务器。我启动了我的 Kafka 制作人,它发送 10 条主题为“xxx”的消息。然后停止了我的 Kafka 生产者。现在我启动了我的 Kafka 消费者并订阅了主题“xxx”。我的消费者消费了我的 Kafka 生产者发送的那 10 条消息,它现在没有运行。我需要我的 Kafka 消费者应该只使用来自正在运行的 Kafka 服务器的消息。有什么办法可以做到这一点?关注我的消费属性中的内容。

props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg1";
props.put("group.id", consumeGroup);
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "100");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

最佳答案

设置以下属性:

consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

它告诉消费者只读取最新消息,即消费者启动后发布的消息。

关于java - 在 Kafka 中使用实时消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39788414/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com