gpt4 book ai didi

java - 卡夫卡消费者: controlled reading from topic

转载 作者:行者123 更新时间:2023-11-30 06:07:03 24 4
gpt4 key购买 nike

我有下面的kafka消费者代码,其中3个线程正在从具有3个分区的kafka主题读取。

有没有办法,只有在线程当前正在处理的消息得到处理后,才会从 kafka 主题中读取新消息。

例如,假设主题中有 100 条消息,那么有什么方法可以一次只读取和处理 3 条消息。现在,当这 3 条消息得到处理后,只应读取接下来的 3 条消息,依此类推。

public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

// now launch all the threads
//
executor = Executors.newFixedThreadPool(3);

// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}

最佳答案

如果 ConsumerTest 内的迭代器同步处理消息,则一次只会消耗 3 条消息。默认情况下,enable.auto.commit 为 true。确保您没有将其设置为 false,否则您需要添加提交偏移量的逻辑。

前-

 ConsumerIterator<byte[], byte[]> streamIterator= stream.iterator(); 
while (streamIterator.hasNext()) {
String kafkaMsg= new String(streamIterator.next().message());
}

关于java - 卡夫卡消费者: controlled reading from topic,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42528912/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com