gpt4 book ai didi

java - 使用 JAVA 的 Kafka 消费者

转载 作者:搜寻专家 更新时间:2023-11-01 02:59:33 25 4
gpt4 key购买 nike

我有一个包含多个主题的 Kafka-Broker,每个主题都有一个分区。

我有一个消费者可以很好地消费来自主题的消息

我的问题是我需要通过增加分区数来提高消息队列的吞吐量,比如说我在一个主题上有四个分区,有没有办法让我写四个消费者,每个消费者都指向主题上的各个分区话题???

import java.util.*;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer {
private ConsumerConnector consumerConnector = null;
private final String topic = "mytopic";

public void initialize() {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "testgroup");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "300");
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig conConfig = new ConsumerConfig(props);
consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
}

public void consume() {
//Key = topic name, Value = No. of threads for topic
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, new Integer(1));

//ConsumerConnector creates the message stream for each topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
consumerConnector.createMessageStreams(topicCount);

// Get Kafka stream for topic 'mytopic'
List<KafkaStream<byte[], byte[]>> kStreamList =
consumerStreams.get(topic);
// Iterate stream using ConsumerIterator
for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();

while (consumerIte.hasNext())
System.out.println("Message consumed from topic
[" + topic + "] : " +
new String(consumerIte.next().message()));
}
//Shutdown the consumer connector
if (consumerConnector != null) consumerConnector.shutdown();
}

public static void main(String[] args) throws InterruptedException {
KafkaConsumer kafkaConsumer = new KafkaConsumer();
// Configure Kafka consumer
kafkaConsumer.initialize();
// Start consumption
kafkaConsumer.consume();
}

}

最佳答案

从本质上讲,您需要做的就是启动几个都在同一个消费者组中的消费者。如果您使用的是 kafka 0.9 或更高版本的新消费者,或者如果您使用的是高级消费者,kafka 将负责划分分区,确保每个分区由一个消费者读取。如果你有比消费者更多的分区,那么一些消费者会从多个分区接收消息,但是没有一个分区会被来自同一消费者组的多个消费者读取,以确保消息不会重复。所以你永远不需要比分区更多的消费者,因为一些消费者会闲置。您还可以使用简单的消费者 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example 微调哪个消费者读取每个分区

看来您使用的是 Kafka 0.8 或更早版本的旧消费者。您可能需要考虑切换到新的消费者。 http://kafka.apache.org/documentation.html#intro_consumers

这里有另一篇很好的文章,里面有使用新消费者编写消费者的详细例子:http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

关于java - 使用 JAVA 的 Kafka 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39645589/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com