gpt4 book ai didi

java - 如果一个分区受到限制,如何对 kafka 中的剩余分区应用循环法

转载 作者:太空宇宙 更新时间:2023-11-04 10:41:55 24 4
gpt4 key购买 nike

我限制了主题的一个分区用于特定服务(因此所有请求都将到达此处以获取服务 X)。对于任何其他服务请求将到达剩余的 N 个分区。

在java中,我通过org.apache.kafka.clients. Producer.Partitioner接口(interface)实现了它。

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

String partitionKey = (String) key;

if(Channel.DB.getValue().equalsIgnoreCase(partitionKey) && ( KafkaTopic.TRANS.getValue().equalsIgnoreCase(topic) || KafkaTopic.CONS.getValue().equalsIgnoreCase(topic) )){
return 1; // this is reserved for SERVICE X only
}

return 0; // here i want to produce messages on remaining partitions, how to return partition now?
}

问题:1:如何返回分区号在这种情况下2:如何以循环方式生成其他消息,不包括服务 X 的分区。

我正在使用 Apache Kafka 9.0.1。

最佳答案

下面的代码对我有用 - 这里的想法是,当 key 不适用于保留分区时,您可以从可用分区列表中删除该特定分区,并对剩余分区进行循环。

private final AtomicInteger counter = new AtomicInteger(0);

public static final int SPECIAL_PARTITION_ID = 1;

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();

String partitionKey = (String) key;

if ("SPECIAL_CUSTOMER".equals(partitionKey)) {
LOGGER.info("PARTITION= " + SPECIAL_PARTITION_ID);
return SPECIAL_PARTITION_ID; //special partition reserved for MY_SPECIAL_CUSTOMER
} else {
int nextValue = counter.getAndIncrement();

List<PartitionInfo> availablePartitions = new ArrayList<>(cluster.availablePartitionsForTopic(topic));

if (availablePartitions.size() > 0) {

PartitionInfo specialPartition = null;

for (PartitionInfo partitionInfo : availablePartitions) {
if (partitionInfo.partition() == SPECIAL_PARTITION_ID) {
specialPartition = partitionInfo;
break;
}
}

availablePartitions.remove(specialPartition);

int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
//optional -- depending upon your usecase
while (true) {
int p = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
if(p != SPECIAL_PARTITION_ID) {
return p;
}
}
}
}
}

如果您可以只确保一个键始终进入保留分区,而其他键可能会进行循环(包括特殊分区),那么您可以通过在键用于保留分区时传递partitionId来轻松实现它,否则根本不传递键,这可以节省您编写自定义分区程序。

此外,如果您不介意保留分区是最后一个分区,其余分区则分配给其他分区,则可能有一个更简单的实现(摘自《Kafka:权威指南》一书)

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;

public class BananaPartitioner implements Partitioner {

public void configure(Map<String, ?> configs) {} 1

public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes,
Cluster cluster) {
List<PartitionInfo> partitions =
cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();

if ((keyBytes == null) || (!(key instanceOf String))) 2
throw new InvalidRecordException("We expect all messages
to have customer name as key")

if (((String) key).equals("Banana"))
return numPartitions; // Banana will always go to last
partition

// Other records will get hashed to the rest of the
partitions
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
}

public void close() {}
}

关于java - 如果一个分区受到限制,如何对 kafka 中的剩余分区应用循环法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48885460/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com