gpt4 book ai didi

java - Apache Kafka Java 消费者未收到复制因子大于 1 的主题的消息

转载 作者:行者123 更新时间:2023-12-02 09:34:59 25 4
gpt4 key购买 nike

我开始使用 Apache Kakfa,使用 Java 编写一个简单的生产者、消费者应用程序。我正在使用kafka-clients版本0.10.0.1并在 Mac 上运行它。

我创建了一个名为 replicated_topic_partitioned 的主题有 3 个分区,复制因子为 3。

我在端口 2181 启动了 Zookeeper。我分别在端口 9092、9093 和 9094 上启动了 id 为 1、2 和 3 的三个代理。

这是描述命令的输出

kafka_2.12-2.3.0/bin/kafka-topics.sh --describe --topic replicated_topic_partitioned --bootstrap-server localhost:9092    
Topic:replicated_topic_partitioned PartitionCount:3 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: replicated_topic_partitioned Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: replicated_topic_partitioned Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: replicated_topic_partitioned Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1

我编写了一个简单的生产者和消费者代码。生产者成功运行并发布了消息。但是当我启动消费者时,轮询调用只是无限期地等待。在调试时,我发现它在 ConsumerNetworkClient 上的awaitMetadataUpdate 方法处不断循环。

这是生产者和消费者的代码

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> myProducer = new KafkaProducer<>(properties);
DateFormat dtFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");
String topic = "replicated_topic_partitioned";

int numberOfRecords = 10;
try {
for (int i = 0; i < numberOfRecords; i++) {
String message = String.format("Message: %s sent at %s", Integer.toString(i), dtFormat.format(new Date()));
System.out.println("Sending " + message);
myProducer.send(new ProducerRecord<String, String>(topic, message));

}
} catch (Exception e) {
e.printStackTrace();
} finally {
myProducer.close();
}

消费者.java

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", UUID.randomUUID().toString());
properties.put("auto.offset.reset", "earliest");

KafkaConsumer<String, String> myConsumer = new KafkaConsumer<>(properties);

String topic = "replicated_topic_partitioned";
myConsumer.subscribe(Collections.singletonList(topic));

try {
while (true){
ConsumerRecords<String, String> records = myConsumer.poll(1000);
printRecords(records);
}
} finally {
myConsumer.close();
}

server.properties 添加一些关键字段

broker.id=1 
host.name=localhost
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

log.dirs=/tmp/kafka-logs-1
num.partitions=1
num.recovery.threads.per.data.dir=1

transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

另外两个代理的 server.properties 是上述代理的副本,其中 Broker.id、端口和 log.dirs 已更改。

这对我不起作用: Kafka 0.9.0.1 Java Consumer stuck in awaitMetadataUpdate()

<小时/>

但是,如果我从传递分区的命令行启动使用者,它会成功读取该分区的消息。但当指定主题时,它不会收到任何消息。

作品:

kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092 
--from-beginning --partition 1

不起作用:

kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092 
--from-beginning

注意:上述消费者非常适合复制因子等于 1 的主题。

问题:

  1. 为什么 Java Producer 不读取复制因子大于 1 的主题的任何消息(即使将其分配给分区)(如 myConsumer.assign(Collections.singletonList(new TopicPartition(topic, 2) )?

  2. 为什么控制台使用者仅在传递分区时才读取消息(同样适用于复制因子为 1 的主题)

最佳答案

因此,您要发送 10 条记录,但所有 10 条记录都具有相同的 key :

for (int i = 0; i < numberOfRecords; i++) {
String message = String.format("Message: %s sent at %s", Integer.toString(i), dtFormat.format(new Date()));
System.out.println("Sending " + message);
myProducer.send(new ProducerRecord<String, String>(topic, message)); <--- KEY=topic
}

除非另有说明(通过直接在 ProducerRecord 上设置分区),记录传送到的分区由以下内容确定:

partition = murmur2(serialize(key)) % numPartitions

所以相同的键意味着相同的分区。

您是否尝试过在分区 0 和 2 上搜索 10 条记录?

如果您希望记录在分区之间更好地“分布”,请使用空键(您将获得循环)或可变键。

关于java - Apache Kafka Java 消费者未收到复制因子大于 1 的主题的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57596903/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com