gpt4 book ai didi

apache-kafka - Apache Kafka 0.9.0.0 显示所有带有分区的主题

转载 作者:行者123 更新时间:2023-12-04 04:59:55 24 4
gpt4 key购买 nike

我目前正在评估 Apache Kafka,我有一个简单的使用者,应该从特定的主题分区读取消息。这是我的客户:

public static void main(String args[]) {

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

TopicPartition partition0 = new TopicPartition("test_topic", Integer.parseInt(args[0]));

ArrayList topicAssignment = new ArrayList();
topicAssignment.add(partition0);
consumer.assign(topicAssignment);

//consumer.subscribe(Arrays.asList("test_topic"));
int commitInterval = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
if (buffer.size() >= commitInterval) {
process(buffer);
consumer.commitSync();
buffer.clear();
}
}
}
}

static void process(List<ConsumerRecord<String, String>> buffers) {
for (ConsumerRecord<String, String> buffer : buffers) {
System.out.println(buffer);
}
}

这是我用来启动 Apache Kafka 的命令:
bin/kafka-server-start.sh config/server.properties & bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test_topic

正如您在此处看到的,我正在创建具有 2 个分区(p0 和 p1)的主题!

然后,我使用以下命令启动我的消费者的两个实例:

对于消费者 1:
java -cp target/scala-2.11/kafka-consumer-0.1.0-SNAPAHOT.jar com.test.api.consumer.KafkaConsumer09Java 0

对于消费者 2:
java -cp target/scala-2.11/kafka-consumer-0.1.0-SNAPAHOT.jar com.test.api.consumer.KafkaConsumer09Java 1

其中 0 和 1 表示我希望我的消费者从中读取消息的实际分区。

但是发生的情况是只有我的消费者 1 收到了所有消息。我的印象是来自生产者的消息最终在分区上是平等的。

我使用以下命令查看我的主题 test_topic 有多少个分区:
Joes-MacBook-Pro:kafka_2.11-0.9.0.0 joe$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --broker-info --group test --topic test_topic --zookeeper localhost:2181
[2016-01-14 13:36:48,831] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group Topic Pid Offset logSize Lag Owner
test test_topic 0 10000 10000 0 none
BROKER INFO
0 -> 172.22.4.34:9092

为什么即使我告诉Kafka为test_topic创建2个分区也只有一个分区?

这是我的制作人:
  def main(args: Array[String]) {
//val conf = new SparkConf().setAppName("VPP metrics producer")
//val sc = new SparkContext(conf)

val props: Properties = new Properties()
props.put("metadata.broker.list", "localhost:9092,localhost:9093")
props.put("serializer.class", "kafka.serializer.StringEncoder")

val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)

1 to 10000 map {
case i =>
val jsonStr = getRandomTsDataPoint().toJson.toString
println(s"sending message $i to kafka")
producer.send(new KeyedMessage[String, String]("test_topic", jsonStr))
println(s"sent message $i to kafka")
}
}

最佳答案

如果您使用 2 创建主题,我不确定为什么您会有 1 个分区。我从来没有遇到过,这是肯定的。

你能试试这个吗:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic
这应该会告诉你有多少分区真的存在。

然后,如果真的有 1 个分区,也许您可​​以通过创建一个新主题来重新开始:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test_topic_2

然后尝试:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic_2
...并报告调查结果。

关于apache-kafka - Apache Kafka 0.9.0.0 显示所有带有分区的主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34789999/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com