gpt4 book ai didi

apache-kafka - 如何使用 Kafka Consumer API 中的 key 读取数据?

转载 作者:行者123 更新时间:2023-12-04 04:54:53 33 4
gpt4 key购买 nike

我正在使用以下代码构建消息...

Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
KeyedMessage<String, String> keyedMsg = new KeyedMessage<String, String>(topic, "device-420", "{message:'hello world'}");
producer.send(keyedMsg);

并使用以下代码块消费......
//Key = topic name, Value = No. of threads for topic
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, 1);

//ConsumerConnector creates the message stream for each topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumerConnector.createMessageStreams(topicCount);

// Get Kafka stream for topic
List<KafkaStream<byte[], byte[]>> kStreamList = consumerStreams.get(topic);

// Iterate stream using ConsumerIterator
for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();
while (consumerIte.hasNext()) {
MessageAndMetadata<byte[], byte[]> msg = consumerIte.next();
System.out.println(topic.toUpperCase() + ">"
+ " Partition:" + msg.partition()
+ " | Key:"+ new String(msg.key())
+ " | Offset:" + msg.offset()
+ " | Message:"+ new String(msg.message()));
}
}

一切正常,因为我正在明智地阅读数据主题。所以我想知道有没有办法使用消息 来使用数据关键 设备 420 在这个例子中?

最佳答案

简短的回答:没有。

Kafka 中最小的粒度是分区。您可以编写一个只从单个分区读取的客户端。但是,一个分区可以包含多个键,您需要使用该分区中包含的所有键。

关于apache-kafka - 如何使用 Kafka Consumer API 中的 key 读取数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37435040/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com