gpt4 book ai didi

java - Kafka Producer将消息发布到单个分区

转载 作者:行者123 更新时间:2023-12-02 09:28:53 26 4
gpt4 key购买 nike

我是 Kafka 新手,正在阅读可用的官方文档。

在我的本地系统上,我已经启动了一个kafka实例以及zookeeper。 Zookeper 和 kafka 服务器都在默认端口上运行。

我创建了一个主题“test”,复制因子为 1,因为我只有一个 kafka 实例启动并运行。

随之我创建了两个分区。

我有两个消费者在同一消费者组内订阅了此队列。

现在我已经在 Windows 机器上使用命令提示符启动了消费者。

当我从命令提示符启动生产者并将消息发布到主题时,一切正常。 Kafka 使用循环法将消息推送到两个分区,并且每个消费者交替接收消息,因为每个消费者都在监听不同的分区。

但是当我使用 java kafka-client jar 创建生产者时,即使我对消息使用不同的 key ,生产者也会将所有消息推送到同一个分区,所有消息都在同一个消费者上接收。

分区不是静态的,而且每次我运行生产者时它都会不断变化。

我尝试了与从命令提示符启动的生产者相同的场景,其配置与我使用 java 代码向 kafka-client 生产者提供的配置完全相同。命令提示符生成器似乎工作正常,但代码生成器将所有消息推送到同一分区。

我尝试更改某些消息的 key ,希望代理将其发送到不同的分区,因为文档中提到代理使用消息的 key 路由消息。

public class KafkaProducerParallel {


public static void main(String[] args) throws InterruptedException,
ExecutionException {

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "parallelism-
producer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
LongSerializer.class);


Producer<String, Long> parallelProducer = new KafkaProducer<>
(properties);

for(long i=0;i<100;i++) {

ProducerRecord<String, Long> producerRecord;

if(i<50) {
producerRecord = new ProducerRecord<String,
Long>("second-topic", "Amoeba", i);
}else {
producerRecord = new ProducerRecord<String,
Long>("second-topic", "Bacteria", i);
}

RecordMetadata recordMetadata =
parallelProducer.send(producerRecord).get();

System.out.printf("Sent record : with key %s and value
%d to partition %s", producerRecord.key(), producerRecord.value(),
recordMetadata.partition());
System.out.println();
}

parallelProducer.close();


}

}

根据文档,kafka 代理通过使用 key (生成 key 的哈希值)来决定将特定消息放入哪个分区。我在一段时间后更改记录的 key ,但消息仍然每次都会发送到同一个分区。

代码的示例控制台输出:

  Sent record : with key Amoeba and value 0 to partition 1
Sent record : with key Amoeba and value 1 to partition 1
Sent record : with key Amoeba and value 2 to partition 1
Sent record : with key Amoeba and value 3 to partition 1
Sent record : with key Amoeba and value 4 to partition 1
Sent record : with key Amoeba and value 5 to partition 1
Sent record : with key Amoeba and value 6 to partition 1
Sent record : with key Amoeba and value 7 to partition 1
Sent record : with key Amoeba and value 8 to partition 1
Sent record : with key Amoeba and value 9 to partition 1
Sent record : with key Amoeba and value 10 to partition 1
Sent record : with key Amoeba and value 11 to partition 1
Sent record : with key Amoeba and value 12 to partition 1
Sent record : with key Amoeba and value 13 to partition 1

Sent record : with key Bacteria and value 87 to partition 1
Sent record : with key Bacteria and value 88 to partition 1
Sent record : with key Bacteria and value 89 to partition 1
Sent record : with key Bacteria and value 90 to partition 1
Sent record : with key Bacteria and value 91 to partition 1
Sent record : with key Bacteria and value 92 to partition 1
Sent record : with key Bacteria and value 93 to partition 1
Sent record : with key Bacteria and value 94 to partition 1
Sent record : with key Bacteria and value 95 to partition 1
Sent record : with key Bacteria and value 96 to partition 1
Sent record : with key Bacteria and value 97 to partition 1
Sent record : with key Bacteria and value 98 to partition 1
Sent record : with key Bacteria and value 99 to partition 1

最佳答案

从 Apache Kafka 2.4 版及更高版本开始,对于具有 null 键的记录,默认分区策略已更改,粘性分区是默认行为。

之前的循环策略意味着带有空键的记录将被分割到各个分区,新的粘性分区策略将记录发送到同一个分区,直到分区的批处理“完成”(这是由batch.size或linger定义的) .ms)

查看这篇文章了解更多信息: Improvements with Sticky Partitioner

关于java - Kafka Producer将消息发布到单个分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58132107/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com