- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
**摘要:**因为在我们实际的运维过程中,需要指定某个副本为ISR,但是Kafka中的Leader选举策略并不支持这个功能,所以需要我们自己来实现它。
本文分享自华为云社区《Kafka的指定副本作为Leader的三种实现方式》,作者:石臻臻的杂货铺。
前几天有个群友问到: kafka如何修改优先副本? 他们有个需求是, 想指定某个分区中的其中一个副本为Leader。
对于这么一个问题,在我们生产环境还是挺常见的,经常有需要修改某个Topic中某分区的Leader
比如 topic1-0这个分区有3个副本[0,1,2], 按照「优先副本」的规则那么 0 号副本肯定就是Leader了 我们都知道分区中的只有Leader副本才会提供读写副本其他副本作为备份 假如在某些情况下,「0」 号副本性能资源不够,或者网络不太好,或者IO压力比较大那么肯定对Topic的整体读写性能有很大影响, 这个时候切换一台压力较小副本作为Leader就显得很重要;优先副本: 分区中的AR(所有副本)信息, 优先选择排在第一位的副本作为Leader Leader机制: 分区中只有一个Leader来承担读写,其他副本只是作为备份
那么如何实现这样一个需求呢?
知道了原理之后,我们就能想到对应的解决方案了 只要将 分区的 AR 中的第一个位置,替换成你指定副本就行了;AR = { 0,1,2 } ==> AR = {2,1,0}
一般能够达到这个目的有两种方案,下面我们来分析一下
一般分区副本重分配主要有三个流程
这里我们主要看第2步骤, 来看看迁移文件一般是什么样子的
{
"version": 1,
"partitions": [{
"topic": "topic1",
"partition": 0,
"replicas": [0,1,2]
}]
}
这个迁移Json意思是, 把topic1的「0」号分区的副本分配成[0,1,2] ,也就是说 topic1-0号分区最终有3个副本分别在 {brokerId-0,brokerId-1,brokerId-2} ;
又根据Leader的选举策略得知,不管是什么策略的选择,都是按照AR的顺序来选的
AR: 副本的分配顺序
那么我们想要实现我们的需求
是不是把这个Json文件 中的 “replicas”: [0,1,2] 改一下就行了
比如改成 “replicas”: [2,1,0] ,
改完Json后执行,执行execute, 正式开始重分配流程!
迁移完成之后, 就会发现,Leader已经变成上面的第一个位置的副本「2」 了
修改完AR顺序就结束了吗?
可以说是结束了,也可以说没有结束。
上面只是修改了AR的顺序, 但是没有执行Leader选举呀,这个时候Leader还是原来的,所以我们需要主动触发一下Leader选举
## 石臻臻的杂货铺
## 微信: szzdzhp001
sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic Topic1 --election-type PREFERRED --partition 0
这样就会立马切换成我们想要的Leader了。
也可以不主动触发,等Controller自动均衡。
如果你觉得主动触发这个很麻烦,那么没有关系,那就不执行,如果你开启了自动均衡策略的话,默认是开启的。
当一个broker停止或崩溃时,这个broker中所有分区的leader将转移给其他副本。这意味着在默认情况下,当这个broker重新启动之后,它的所有分区都将仅作为follower,不再用于客户端的读写操作。
为了避免这种不平衡,Kafka有一个优先副本的概念。如果一个分区的副本列表是1,5,9,节点1将优先作为其他两个副本5和9的leader。
Controller会有一个定时任务,定期执行优先副本选举,这样就不会导致负载不均衡和资源浪费,这就是leader的自动均衡机制
优点: 实现了需求, 不需要改源码,也没有额外的开发工作。
缺点: 操作比较复杂容易出错,需要先获取原先的分区分配数据,然后手动修改Json文件,这里比较容易出错,影响会比较大,当然这些都可以通过校验接口来做好限制, 最重要的一点是 副本重分配当前只能有一个任务 !
假如你当前有一个「副本重分配」的任务在,那么这里就不能够执行了。
例如:
修改的时候请先用get获取数据,在那个基础上改,因为不同版本,里面的数据结构是不一样的,我们只需要改分区AR顺序就行了 “partitions”:{“0”:[0,1,2]}
## get zk 节点数据。
get /szz1/brokers/topics/Topic2
## zk中的修改命令
set /szz1/brokers/topics/Topic2 {"version":2,"partitions":{"0":[0,1,2]},"adding_replicas":{},"removing_replicas":{}}
为什么要删除Controller的zk节点?
之所以删除Controller节点,是因为我们手动修改了zk节点数据之后,因为没有副本的新增,是不会触发Controller去更新AR内存的,就算你主动触发Leader选举,AR还是以前的,并不会达到想要的效果。
删除zk中的/Controller节点,会触发Controller重新选举,重新选举会重新加载所有元数据,所以我们刚刚加载的数据就会生效, 同时Controller重新加载也会触发Leader选举。
简单代码
当然上面功能,手动改起来麻烦,那么饿肯定是要集成到LogiKM 3.0中的咯;
优点: 实现了目标需求, 简单, 操作方便
缺点: 频繁的Controller重选举对生产环境来说会有一些影响;
我们方案二中的问题就是需要删除/Controller节点发送重新选举,我们能不能不重新选举Controller也能生效呢?
如何让修改后的AR立即生效 ?
Controller会监听每一个topic的节点/brokers/topics/{topic名称}
KafkaController#processPartitionModifications
/**
* 石臻臻的杂货铺
* 微信:szzdzhp001
* 省略部分代码
**/
private def processPartitionModifications(topic: String): Unit = {
def restorePartitionReplicaAssignment(
topic: String,
newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
): Unit = {
val partitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
}
if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {
} else if (partitionsToBeAdded.nonEmpty) {
info(s"New partitions to be added $partitionsToBeAdded")
partitionsToBeAdded.foreach { case (topicPartition, assignedReplicas) =>
controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas)
}
onNewPartitionCreation(partitionsToBeAdded.keySet)
}
}
}
这段代码省略了很多,我想让你看到的是:
只有新增了副本,才会执行更新Controller的内存操作。
那么我们在这里面新增一段逻辑
新增逻辑:如果只是变更了AR的顺序,那么我们也更新一下内存。
来我们改一下源码
// 1. 找到 AR 顺序有变更的 所有TopicPartition
val partitionsOrderChange = partitionReplicaAssignment.filter { case (topicPartition, _) =>
//这里自己写下过滤逻辑 把只是顺序变更的分区找出
true
}
if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {
if (partitionsToBeAdded.nonEmpty) {
} else {
}
} else if (partitionsToBeAdded.nonEmpty) {
info(s"New partitions to be added $partitionsToBeAdded")
partitionsToBeAdded.foreach { case (topicPartition, assignedReplicas) =>
controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas)
}
onNewPartitionCreation(partitionsToBeAdded.keySet)
}else if (partitionsOrderChange.nonEmpty) {
// ② .在这里加个逻辑
info(s"OrderChange partitions to be updatecache $partitionsToBeAdded")
partitionsOrderChange.foreach { case (topicPartition, assignedReplicas) =>
controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas)
}
}
改成这样之后,上面的流程就变成了
方案三改了之后会对其他的流程有影响吗?
上面更改的方法,一般是在分区副本重分配或者新增分区的时候会触发。
上面新增的逻辑并不会对现有流程有影响,因为假设都是上面的场景的情况下,他们都是会主动更新内存的。
在我看来,这里的改动,完全可以向kafka社区提一个Pr. 来“修复”这个问题。
因为提了这个PR,对我们有收益,没有额外的开销!
我正在我的 java 作业中使用 GUI,并且我必须指定 JCheckBox 中的其他内容。除了这个小要求,其他的我都完成了。我不太确定如何解决这个问题,我查阅了我的书并尝试在线研究 要求: 一系列复
在各种语言中(我将在这里使用 JavaScript,但我已经在 PHP 和 C++ 中以及可能在其他地方看到过它),似乎有几种构造简单 for 循环的方法。版本 1 如下: var top = doc
有没有一种方法可以使用 CSS 指定每次“小于符号”(在键盘上 M 的右侧)或“大于符号”出现在文本中时,它应该被替换为分别是“小于”或“大于”的实际词? 最佳答案 CSS 不能作用于(不能修改,即)
首先,使用 setspn 命令为用户注册服务主体名称。 setspn -a CS/dummy@abc.com dummyuser setspn -l dummyuser 给出输出为 CS/dummy@
我在指定从 SFSafariViewController 访问时遇到问题,因为它具有与 Safari 浏览器完全相同的用户代理。 我要做的是仅在 webview 内显示图片,如果在普通浏览器上查看,则
我正在尝试用 R 语言在 lavaan 中指定一个奇怪的模型。该模型如下所示: 我的规范尝试如下所示。我发现难以实现的是将观察到的变量的唯一误差固定为唯一项的两个相关性的总和。 例如,项目 y*1,2
我正在构建 API 以将我的 React 应用程序与我的后端服务连接起来,我想使用 typescript 来指定 data 的类型在我的 Axios 请求中。如何在不修改其他字段的情况下更新 Axio
如何为模型指定初始“软”值?该初始模型是解决类似查询的结果,并且该模型很可能具有正确的部分,甚至对于当前查询可能是正确的。 目前,我正在通过增量求解和 hard/soft constraints 对此
我有来自网页的以下代码 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example 似乎缺少的是如何配置分区数。我
有没有办法在每个查询的基础上在 Neo4jClient 中指定 Cypher 解析器的版本,如 here 所述? 谢谢! 最佳答案 如果您将 Neo4jClient 更新到最新版本(> 1.0.0.6
我有以下代码生成四个图,但它们最终被压扁(见下图)。我该如何解决这个问题? par(mfrow=c(2,2)) curve(.5*exp(-.5*x),from=0,to=10,main="f(x)"
我有一个 ColdFusion 10 服务器。我正在使用 JDBC 驱动程序连接到 db2 数据库。我偶然发现了这个笔记。这个设置在哪里?我还查看了 neo*.xml 文件,但没有看到任何 db 驱动
我想知道是否可以指定验证器的运行顺序。 目前,我编写了一个自定义验证器,检查它是否为 [a-zA-Z0-9]+ 以确保登录验证我们的规则,并编写了一个远程验证器以确保登录可用,但目前远程验证器已启动在
我的应用程序需要至少 40MB 的 RAM,因此早期的 iPhone(例如 3G、第一个 iPod touch 版本)就没有它(它们为我的应用程序提供的最大内存约为 20MB)。有没有正确的方法来禁用
我有一个保存日期(不是当前日期)的 Date 对象,我需要以某种方式指定该日期为 UTC,然后将其转换为“欧洲/巴黎”,即 +1 小时。 public static LocalDateTime toL
我想问你在 Varnish 代码中如何在没有缓存的情况下将请求传递到后端。 我知道我可以做到并且正在发挥作用: if (req.url ~ "(\?|&)(something|somethin
我目前基于模块编译程序(如主程序 foo 依赖于模块 bar )如下: gfortran -c bar.f90 gfortran -o foo.exe foo.f90 bar.o 这在 foo.f90
我正在尝试创建一个依赖于另一个 meteor 包的新 meteor 包。当我尝试 meteor add mypackage 时,出现以下错误。为什么 Meteor 不添加 mypackage 并引入它
我正在制作执行器/ react 器,同时发现这是一个终生的问题。它与 async/Future 无关,可以在没有 async 糖的情况下进行复制。 use std::future::Future; s
我在 cassandra 中有一个表,其数据类型为时间戳。我正在使用 cqlsh 从数据库中获取数据,并希望更改我的时间戳列输出的输出格式。我研究了一下,发现我可以通过更改以下文件来更改时间戳输出格式
我是一名优秀的程序员,十分优秀!