gpt4 book ai didi

java - 卡夫卡流 RoundRobinPartitioner

转载 作者:行者123 更新时间:2023-11-30 05:20:36 26 4
gpt4 key购买 nike

我编写了一个kafka流代码,使用kafka 2.4 kafka客户端版本和kafka 2.2服务器版本。我的主题和内部主题有 50 个分区。

我的kafka流代码有selectKey() DSL操作,并且我有200万条使用相同KEY的记录。在流配置中,我已经完成了

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);

这样我就可以使用具有完全相同 key 的不同分区。如果我没有按预期使用循环法,我的所有消息都会发送到同一分区。

直到现在一切都很好,但我意识到;当我使用 RoundRobinPartitioner 类时,我的消息大约有 40 个分区。 10分区处于空闲状态。我想知道我错过了什么?它应该使用其中 50 条大约 200 万条记录,对吗?

      final KStream<String, IdListExportMessage> exportedDeviceIdsStream =
builder.stream("deviceIds");

// k: appId::deviceId, v: device
final KTable<String, Device> deviceTable = builder.table(
"device",
Consumed.with(Serdes.String(), deviceSerde)
);
// Some DSL operations
.join(
deviceTable,
(exportedDevice, device) -> {
exportedDevice.setDevice(device);

return exportedDevice;
},
Joined.with(Serdes.String(), exportedDeviceSerde, deviceSerde)
)
.selectKey((deviceId, exportedDevice) -> exportedDevice.getDevice().getId())
.to("bulk_consumer");

还有

   props.put(StreamsConfig.STATE_DIR_CONFIG, /tmp/kafka-streams);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
props.put("num.stream.threads", 10);
props.put("application.id", applicationId);

RoundRobinPartitioner.java

public class RoundRobinPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

public RoundRobinPartitioner() {
}

public void configure(Map<String, ?> configs) {
}

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
}

private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}

public void close() {
}
}

最佳答案

您无法使用 ProducerConfig.PARTITIONER_CLASS_CONFIG 更改分区配置——这只适用于普通生产者。

在Kafka Streams中,需要实现接口(interface)StreamsPartitioner并将您的实现传递给相应的运算符,例如 to("topic", Produced.streamPartitioner(new MyPartitioner()) .

关于java - 卡夫卡流 RoundRobinPartitioner,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59645127/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com