gpt4 book ai didi

java - 消费者阅读 __consumer_offsets 传递不可读的消息

转载 作者:行者123 更新时间:2023-11-29 04:16:10 26 4
gpt4 key购买 nike

我正在尝试从 __consumer_offsets 主题中消费,因为这似乎是检索有关消费者的 kafka 指标(如消息滞后等)的最简单方法。理想的方法是从 jmx 访问它,但想先尝试这个和消息回来似乎是加密的或不可读的形式。也尝试添加 stringDeserializer 属性。有人对如何纠正此问题有任何建议吗?再次提到 this 是

的副本

duplicate consumer_offset

没有帮助,因为它没有引用我的问题,即在 java 中将消息作为字符串读取。还更新了代码以尝试使用 kafka.client 消费者的 consumerRecord。

consumerProps.put("exclude.internal.topics",  false);
consumerProps.put("group.id" , groupId);
consumerProps.put("zookeeper.connect", zooKeeper);


consumerProps.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(
consumerConfig);

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

for (KafkaStream stream : streams) {

ConsumerIterator<byte[], byte[]> it = stream.iterator();

//errorReporting("...CONSUMER-KAFKA CONNECTION SUCCESSFUL!");

while (it.hasNext()) {
try {

String mesg = new String(it.next().message());
System.out.println( mesg);

代码更改:

try {       
// errorReporting("CONSUMER-KAFKA CONNECTION INITIATING...");
Properties consumerProps = new Properties();
consumerProps.put("exclude.internal.topics", false);
consumerProps.put("group.id" , "test");
consumerProps.put("bootstrap.servers", servers);
consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

//ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
//ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
// consumerConfig);

//Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//topicCountMap.put(topic, new Integer(1));
//Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
//List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

KafkaConsumer<String, String> kconsumer = new KafkaConsumer<>(consumerProps);
kconsumer.subscribe(Arrays.asList(topic));

try {
while (true) {
ConsumerRecords<String, String> records = kconsumer.poll(10);

for (ConsumerRecord<String, String> record : records)

System.out.println(record.offset() + ": " + record.value());
}
} finally {
kconsumer.close();
}

下面是消息的快照;在图像的底部:

consumer offset

最佳答案

虽然可以直接从 __consumer_offsets 主题中读取,但这不是推荐的或最简单的方法。

如果可以使用Kafka 2.0,最好是使用AdminClient API来描述组:


如果您绝对想直接从__consumer_offset 中读取,您需要对记录进行解码以使其易于阅读。这可以使用 GroupMetadataManager 类来完成:

answer从您链接的问题中包含执行所有这些的框架代码。

另请注意,您不应将记录反序列化为字符串,而应将它们保留为原始字节,以便这些方法能够正确解码它们。

关于java - 消费者阅读 __consumer_offsets 传递不可读的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52278165/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com