- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我限制了主题的一个分区用于特定服务(因此所有请求都将到达此处以获取服务 X)。对于任何其他服务请求将到达剩余的 N 个分区。
在java中,我通过org.apache.kafka.clients. Producer.Partitioner
接口(interface)实现了它。
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String partitionKey = (String) key;
if(Channel.DB.getValue().equalsIgnoreCase(partitionKey) && ( KafkaTopic.TRANS.getValue().equalsIgnoreCase(topic) || KafkaTopic.CONS.getValue().equalsIgnoreCase(topic) )){
return 1; // this is reserved for SERVICE X only
}
return 0; // here i want to produce messages on remaining partitions, how to return partition now?
}
问题:1:如何返回分区号在这种情况下2:如何以循环方式生成其他消息,不包括服务 X 的分区。
我正在使用 Apache Kafka 9.0.1。
最佳答案
下面的代码对我有用 - 这里的想法是,当 key 不适用于保留分区时,您可以从可用分区列表中删除该特定分区,并对剩余分区进行循环。
private final AtomicInteger counter = new AtomicInteger(0);
public static final int SPECIAL_PARTITION_ID = 1;
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
String partitionKey = (String) key;
if ("SPECIAL_CUSTOMER".equals(partitionKey)) {
LOGGER.info("PARTITION= " + SPECIAL_PARTITION_ID);
return SPECIAL_PARTITION_ID; //special partition reserved for MY_SPECIAL_CUSTOMER
} else {
int nextValue = counter.getAndIncrement();
List<PartitionInfo> availablePartitions = new ArrayList<>(cluster.availablePartitionsForTopic(topic));
if (availablePartitions.size() > 0) {
PartitionInfo specialPartition = null;
for (PartitionInfo partitionInfo : availablePartitions) {
if (partitionInfo.partition() == SPECIAL_PARTITION_ID) {
specialPartition = partitionInfo;
break;
}
}
availablePartitions.remove(specialPartition);
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
//optional -- depending upon your usecase
while (true) {
int p = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
if(p != SPECIAL_PARTITION_ID) {
return p;
}
}
}
}
}
如果您可以只确保一个键始终进入保留分区,而其他键可能会进行循环(包括特殊分区),那么您可以通过在键用于保留分区时传递partitionId来轻松实现它,否则根本不传递键,这可以节省您编写自定义分区程序。
此外,如果您不介意保留分区是最后一个分区,其余分区则分配给其他分区,则可能有一个更简单的实现(摘自《Kafka:权威指南》一书)
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
public class BananaPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {} 1
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes,
Cluster cluster) {
List<PartitionInfo> partitions =
cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String))) 2
throw new InvalidRecordException("We expect all messages
to have customer name as key")
if (((String) key).equals("Banana"))
return numPartitions; // Banana will always go to last
partition
// Other records will get hashed to the rest of the
partitions
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
}
public void close() {}
}
关于java - 如果一个分区受到限制,如何对 kafka 中的剩余分区应用循环法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48885460/
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 想改进这个问题?将问题更新为 on-topic对于堆栈溢出。 6年前关闭。 Improve this qu
我有实体: @Entity @Table(name = "CARDS") public class Card { @ManyToOne @JoinColumn(name = "PERSON_I
我正在尝试计算二维多边形的表面法线。我正在使用 OpenGL wiki 中的 Newell 方法来计算表面法线。 https://www.opengl.org/wiki/Calculating_a_S
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 这个问题似乎与 help center 中定义的范围内的编程无关。 . 关闭 7 年前。 Improve
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题吗? Update the question所以它是on-topic用于堆栈溢出。 关闭 9 年前。 Improve this
我这里有以下 XML: Visa, Mastercard, , , , 0, Discover, American Express siteonly, Buyer Pay
即将发生的 Google 政策变更迫使我们实现一个对话框,以通知欧盟用户有关 Cookie/设备标识符用于广告和分析的情况。我只想向欧盟用户显示此对话框。我不想使用额外的权限(例如 android.p
本文分享自华为云社区《华为大咖说 | 企业应用AI大模型的“道、法、术” ——道:认知篇》,作者:华为云PaaS服务小智。 本期核心观点 上车:AGI是未来5~10年内,每个人都无法回避的技
我有一个与酒精相关的网站,需要先验证年龄,然后才能让他们进入该网站。我使用 HttpModule 来执行此操作,该模块检查 cookie,如果未设置,我会将它们重定向到验证页面。我验证他们的年龄并存储
在欧盟,我们有一项法律,要求网页请求存储 cookie 的许可。我们大多数人都了解 cookie 并同意它们,但仍然被迫在任何地方明确接受它们。所以我计划编写这个附加组件(ff & chrome),它
以下在 C 和/或 C++ 中是否合法? void fn(); inline void fn() { /*Do something here*/ } 让我担心的是,第一个声明看起来暗示函数将被定义
我是一名优秀的程序员,十分优秀!