gpt4 book ai didi

apache-kafka - Spring Kafka 分区

转载 作者:行者123 更新时间:2023-12-03 15:45:58 26 4
gpt4 key购买 nike

以下两个代码片段发布消息的行为有何不同?

方法一

Message<String> message = MessageBuilder.withPayload("testmsg")
.setHeader(KafkaHeaders.MESSAGE_KEY, "key").setHeader(KafkaHeaders.TOPIC, "test").build();

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);

方法二
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", "testmsg");

主题配置:
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 2 Leader: 0 Replicas: 0 Isr: 0

观察:

如果有 3 个消费者,每个分区一个;方法 1 导致来自单个分区的单个消费者消耗的所有消息。使用方法 2;消费在 3 个分区/消费者之间平均分配。

最佳答案

但是您的代码中有答案。
第一个与 topic 一起提供 messageKey .
messageKey如果未明确指定,则真正用于确定目标分区:

/**
* computes partition for given record.
* if the record has partition returns the value otherwise
* calls configured partitioner class to compute the partition.
*/
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

哪里 DefaultPartitioner做这个:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
...
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

因此,所有具有相同 key 的消息发送到同一个分区。否则,它们将以循环方式放置到主题中。

关于apache-kafka - Spring Kafka 分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45556142/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com