gpt4 book ai didi

java - 如何通过KafkaConsumer可靠地获取所有kafka主题消息

转载 作者:行者123 更新时间:2023-12-02 11:54:19 25 4
gpt4 key购买 nike

下面的getMessages()方法有时获取kafka主题的所有消息。此代码在页面加载时在 Web 应用程序中执行。有时没有消息返回,有时所有消息都返回。

有没有办法设置属性和/或更改代码,以便所有消息每次都会返回?

public List<String> getMessages() {
List<String> messages = new ArrayList<>();
try {
ConnectionKafka connection = ConstantsHome.connectionManager.getConnectionDef(getGuid(), ConnectionKafka.class);
Properties props = new Properties();
props.put("bootstrap.servers", connection.getProps().get("bootstrapServers"));
props.put("group.id", getName());
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton(getName()));
consumer.poll(0);
consumer.seekToBeginning(consumer.assignment());
ConsumerRecords<String, String> records = consumer.poll(0);
for (ConsumerRecord<String, String> record : records) {
messages.add(
String.format("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value())
);
}
consumer.close(0, TimeUnit.MILLISECONDS);
} catch (Exception e) {
Utils.writeToLog(e, getClass().getName(), "", IErrorManager.ERROR);
}
Collections.sort(messages, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return Integer.valueOf(o1.substring("offset = ".length(), o1.indexOf(","))) -
Integer.valueOf(o2.substring("offset = ".length(), o2.indexOf(",")));
}
});
return messages;
}

最佳答案

如果您的期望是获取每次调用的所有消息,您应该正确设置以下内容

enable.auto.commit = false

另一个选项是为每次迭代创建一个动态组 ID,考虑到组元数据存储在 kafka 端,我会避免使用此选项。

关于java - 如何通过KafkaConsumer可靠地获取所有kafka主题消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47702994/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com